import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
// Replication consensus reached, proceed to commit
finishCommit(clientActor, identifier);
}
- } else if (data instanceof ModificationPayload) {
- try {
- applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
- }
} else if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
import java.io.IOException;
import java.net.URI;
import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.utils.transformer.NormalizedNodePruner;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
if (payload instanceof DataTreeCandidatePayload) {
DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
size++;
- } else if (payload instanceof ModificationPayload) {
- MutableCompositeModification.fromSerializable(
- ((ModificationPayload) payload).getModification()).apply(transaction);
- size++;
} else if (payload instanceof CompositeModificationPayload) {
MutableCompositeModification.fromSerializable(
((CompositeModificationPayload) payload).getModification()).apply(transaction);
} else {
log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (IOException | ClassNotFoundException e) {
+ } catch (IOException e) {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.modification;
-
-import com.google.protobuf.GeneratedMessage.GeneratedExtension;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry;
-
-/**
- * Payload implementation for MutableCompositeModification used for persistence and replication.
- *
- * @author Thomas Pantelis
- */
-public class ModificationPayload extends Payload implements Externalizable {
- private static final long serialVersionUID = 1L;
-
- private transient byte[] serializedPayload;
-
- public ModificationPayload() {
- }
-
- public ModificationPayload(Modification from) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bos);
- out.writeObject(from);
- out.close();
- serializedPayload = bos.toByteArray();
- }
-
- public Modification getModification() throws IOException, ClassNotFoundException {
- ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedPayload));
- Modification to = (Modification) in.readObject();
- in.close();
- return to;
- }
-
- @Override
- public int size() {
- return serializedPayload.length;
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int size = in.readInt();
- serializedPayload = new byte[size];
- in.readFully(serializedPayload);
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(serializedPayload.length);
- out.write(serializedPayload);
- }
-
- @Override
- @Deprecated
- public <T> Map<GeneratedExtension, T> encode() {
- return null;
- }
-
- @Override
- @Deprecated
- public Payload decode(ReplicatedLogEntry.Payload payload) {
- return null;
- }
-}
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
coordinator.applyCurrentLogRecoveryBatch();
}
- @Test
- public void testAppendRecoveredLogEntryModificationPayload() throws IOException {
- final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
- peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
- coordinator.startLogRecoveryBatch(10);
- try {
- final MutableCompositeModification modification = new MutableCompositeModification((short) 1);
- modification.addModification(new WriteModification(CarsModel.BASE_PATH, CarsModel.create()));
- coordinator.appendRecoveredLogEntry(new ModificationPayload(modification));
- } catch(final SchemaValidationFailedException e){
- fail("SchemaValidationFailedException should not happen if pruning is done");
- }
- }
-
@Test
public void testAppendRecoveredLogEntryCompositeModificationPayload() throws IOException {
final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
@Test
public void testApplySnapshot() throws Exception {
- ShardTestKit testkit = new ShardTestKit(getSystem());
-
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
"testApplySnapshot");
- testkit.waitUntilLeader(shard);
+ ShardTestKit.waitUntilLeader(shard);
final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
@Test
public void testApplyState() throws Exception {
-
- ShardTestKit testkit = new ShardTestKit(getSystem());
-
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
-
- testkit.waitUntilLeader(shard);
-
- final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
-
- shard.underlyingActor().onReceiveCommand(applyState);
-
- final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
- assertEquals("Applied state", node, actual);
-
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- @Test
- public void testApplyStateWithCandidatePayload() throws Exception {
-
- ShardTestKit testkit = new ShardTestKit(getSystem());
-
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
- testkit.waitUntilLeader(shard);
+ ShardTestKit.waitUntilLeader(shard);
final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- DataTreeCandidatePayload.create(candidate)));
+ newDataTreeCandidatePayload(new WriteModification(TestModel.TEST_PATH, node))));
shard.underlyingActor().onReceiveCommand(applyState);
InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
- InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
- new WriteModification(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
+ ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION);
+
+ InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newDataTreeCandidatePayload(
+ shardDataTree,
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
final int nListEntries = 16;
final Set<Integer> listEntryKeys = new HashSet<>();
final Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
- newModificationPayload(mod)));
+ newDataTreeCandidatePayload(shardDataTree, mod)));
}
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
testRecovery(listEntryKeys);
}
- private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
- final MutableCompositeModification compMod = new MutableCompositeModification();
+ private static DataTreeCandidatePayload newDataTreeCandidatePayload(final Modification... mods) throws Exception {
+ return newDataTreeCandidatePayload(new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION), mods);
+ }
+
+ private static DataTreeCandidatePayload newDataTreeCandidatePayload(ShardDataTree shardDataTree,
+ final Modification... mods) throws Exception {
+ DataTreeModification dataTreeModification = shardDataTree.newModification();
for(final Modification mod: mods) {
- compMod.addModification(mod);
+ mod.apply(dataTreeModification);
}
- return new ModificationPayload(compMod);
+ return DataTreeCandidatePayload.create(shardDataTree.commit(dataTreeModification));
}
@Test
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Optional;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
return new CompositeModificationByteStringPayload(compMod.toSerializable());
}
- private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
- MutableCompositeModification compMod = new MutableCompositeModification();
- for(Modification mod: mods) {
- compMod.addModification(mod);
- }
-
- return new ModificationPayload(compMod);
- }
-
@Test
public void testApplyHelium2VersionSnapshot() throws Exception {
TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.modification;
-
-import static org.junit.Assert.assertEquals;
-import org.apache.commons.lang3.SerializationUtils;
-import org.junit.Test;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-
-/**
- * Unit tests for ModificationPayload.
- *
- * @author Thomas Pantelis
- */
-public class ModificationPayloadTest {
-
- @Test
- public void test() throws Exception {
-
- YangInstanceIdentifier writePath = TestModel.TEST_PATH;
- NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
- withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
- MutableCompositeModification compositeModification = new MutableCompositeModification();
- compositeModification.addModification(new WriteModification(writePath, writeData));
-
- ModificationPayload payload = new ModificationPayload(compositeModification);
-
- MutableCompositeModification deserialized = (MutableCompositeModification) payload.getModification();
-
- assertEquals("getModifications size", 1, deserialized.getModifications().size());
- WriteModification write = (WriteModification)deserialized.getModifications().get(0);
- assertEquals("getPath", writePath, write.getPath());
- assertEquals("getData", writeData, write.getData());
-
- ModificationPayload cloned = SerializationUtils.clone(payload);
-
- deserialized = (MutableCompositeModification) payload.getModification();
-
- assertEquals("getModifications size", 1, deserialized.getModifications().size());
- write = (WriteModification)deserialized.getModifications().get(0);
- assertEquals("getPath", writePath, write.getPath());
- assertEquals("getData", writeData, write.getData());
- }
-}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import org.apache.commons.lang.SerializationUtils;
-import org.junit.Ignore;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, delete.getVersion());
assertEquals("getPath", deletePath, delete.getPath());
}
-
- @Test
- @Ignore
- public void testSerializationScale() throws Exception {
- YangInstanceIdentifier writePath = TestModel.TEST_PATH;
- NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
- withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
-
- MutableCompositeModification compositeModification = new MutableCompositeModification();
- for(int i = 0; i < 1000; i++) {
- compositeModification.addModification(new WriteModification(writePath, writeData));
- }
-
- Stopwatch sw = Stopwatch.createStarted();
- for(int i = 0; i < 1000; i++) {
- new ModificationPayload(compositeModification);
- }
-
- sw.stop();
- System.out.println("Elapsed: "+sw);
-
- ModificationPayload p = new ModificationPayload(compositeModification);
- sw.start();
- for(int i = 0; i < 1000; i++) {
- p.getModification();
- }
-
- sw.stop();
- System.out.println("Elapsed: "+sw);
- }
}