import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
/**
* @author: syedbahm
*/
public class ShardWriteTransaction extends ShardTransaction {
- private final MutableCompositeModification compositeModification = new MutableCompositeModification();
private int totalBatchedModificationsReceived;
private Exception lastBatchedModificationsException;
private final ReadWriteShardDataTreeTransaction transaction;
} else if(DeleteData.isSerializedType(message)) {
deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
-
- } else if (message instanceof GetCompositedModification) {
- // This is here for testing only
- getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
} else {
super.handleReceive(message);
}
try {
for(Modification modification: batched.getModifications()) {
- compositeModification.addModification(modification);
modification.apply(transaction.getSnapshot());
}
return;
}
- compositeModification.addModification(
- new WriteModification(message.getPath(), message.getData()));
try {
transaction.getSnapshot().write(message.getPath(), message.getData());
WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
return;
}
- compositeModification.addModification(
- new MergeModification(message.getPath(), message.getData()));
-
try {
transaction.getSnapshot().merge(message.getPath(), message.getData());
MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
return;
}
- compositeModification.addModification(new DeleteModification(message.getPath()));
try {
transaction.getSnapshot().delete(message.getPath());
DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
-
- // These classes are in here for test purposes only
-
- static class GetCompositedModification {
- }
-
- static class GetCompositeModificationReply {
- private final CompositeModification modification;
-
-
- GetCompositeModificationReply(CompositeModification modification) {
- this.modification = modification;
- }
-
- public CompositeModification getModification() {
- return modification;
- }
- }
}
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
}};
}
- private void assertModification(final ActorRef subject,
- final Class<? extends Modification> modificationType) {
- new JavaTestKit(getSystem()) {{
- subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
-
- CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
- GetCompositeModificationReply.class).getModification();
-
- assertTrue(compositeModification.getModifications().size() == 1);
- assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
- }};
- }
-
@Test
public void testOnReceiveWriteData() {
new JavaTestKit(getSystem()) {{
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
- assertModification(transaction, WriteModification.class);
-
// unserialized write
transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
transaction.tell(serialized, getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
-
- assertModification(transaction, WriteModification.class);
}};
}
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
- assertModification(transaction, MergeModification.class);
-
//unserialized merge
transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
transaction.tell(serialized, getRef());
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
-
- assertModification(transaction, MergeModification.class);
}};
}
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
- assertModification(transaction, DeleteModification.class);
-
//unserialized
transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
assertEquals("getNumBatched", 3, reply.getNumBatched());
- JavaTestKit verification = new JavaTestKit(getSystem());
- transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
-
- CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
- GetCompositeModificationReply.class).getModification();
-
- assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
-
- WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
- assertEquals("getPath", writePath, write.getPath());
- assertEquals("getData", writeData, write.getData());
-
- MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
- assertEquals("getPath", mergePath, merge.getPath());
- assertEquals("getData", mergeData, merge.getData());
-
- DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
- assertEquals("getPath", deletePath, delete.getPath());
-
InOrder inOrder = Mockito.inOrder(mockModification);
inOrder.verify(mockModification).write(writePath, writeData);
inOrder.verify(mockModification).merge(mergePath, mergeData);