import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import akka.persistence.Persistent;
import akka.persistence.UntypedProcessor;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
/**
private final InMemoryDOMDataStore store;
+ private final Map<Modification, DOMStoreThreePhaseCommitCohort> modificationToCohort = new HashMap<>();
+
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Shard(String name){
registerChangeListener((RegisterChangeListener) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof ForwardedCommitTransaction ) {
+ handleForwardedCommit((ForwardedCommitTransaction) message);
+ } else if (message instanceof Persistent){
+ commit((Persistent) message);
+ }
+ }
+
+ private void commit(Persistent message) {
+ Modification modification = (Modification) message.payload();
+ DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
+ if(cohort == null){
+ log.error("Could not find cohort for modification : " + modification);
+ return;
}
+ final ListenableFuture<Void> future = cohort.commit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new CommitTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when committing");
+ }
+ }
+ }, getContext().dispatcher());
+ }
+
+ private void handleForwardedCommit(ForwardedCommitTransaction message) {
+ log.info("received forwarded transaction");
+ modificationToCohort.put(message.getModification(), message.getCohort());
+ getSelf().forward(Persistent.create(message.getModification()), getContext());
}
private void updateSchemaContext(UpdateSchemaContext message) {
*/
public class ShardTransaction extends UntypedActor {
+ private final ActorRef shardActor;
+
private final DOMStoreReadWriteTransaction transaction;
private final MutableCompositeModification modification = new MutableCompositeModification();
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- public ShardTransaction(DOMStoreReadWriteTransaction transaction) {
+ public ShardTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor) {
this.transaction = transaction;
+ this.shardActor = shardActor;
}
- public static Props props(final DOMStoreReadWriteTransaction transaction){
+ public static Props props(final DOMStoreReadWriteTransaction transaction, final ActorRef shardActor){
return Props.create(new Creator<ShardTransaction>(){
@Override
public ShardTransaction create() throws Exception {
- return new ShardTransaction(transaction);
+ return new ShardTransaction(transaction, shardActor);
}
});
}
private void readyTransaction(ReadyTransaction message){
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort));
+ ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort, shardActor, modification));
getSender().tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
}
public void onReceive(Object message) throws Exception {
if(message instanceof CreateTransaction){
DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
- ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction));
+ ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction, getContext().parent()));
getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
} else if (message instanceof CloseTransactionChain){
chain.close();
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import java.util.concurrent.ExecutionException;
+
public class ThreePhaseCommitCohort extends UntypedActor{
private final DOMStoreThreePhaseCommitCohort cohort;
+ private final ActorRef shardActor;
+ private final CompositeModification modification;
- public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort) {
-
+ public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) {
this.cohort = cohort;
+ this.shardActor = shardActor;
+ this.modification = modification;
}
- @Override
- public void onReceive(Object message) throws Exception {
- throw new UnsupportedOperationException("onReceive");
- }
+ private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- public static Props props(final DOMStoreThreePhaseCommitCohort cohort) {
+ public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) {
return Props.create(new Creator<ThreePhaseCommitCohort>(){
@Override
public ThreePhaseCommitCohort create() throws Exception {
- return new ThreePhaseCommitCohort(cohort);
+ return new ThreePhaseCommitCohort(cohort, shardActor, modification);
}
});
}
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if(message instanceof CanCommitTransaction){
+ canCommit((CanCommitTransaction) message);
+ } else if(message instanceof PreCommitTransaction) {
+ preCommit((PreCommitTransaction) message);
+ } else if(message instanceof CommitTransaction){
+ commit((CommitTransaction) message);
+ } else if (message instanceof AbortTransaction){
+ abort((AbortTransaction) message);
+ }
+ }
+
+ private void abort(AbortTransaction message) {
+ final ListenableFuture<Void> future = cohort.abort();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new AbortTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when aborting");
+ }
+ }
+ }, getContext().dispatcher());
+ }
+
+ private void commit(CommitTransaction message) {
+ // Forward the commit to the shard
+ log.info("Commit transaction now + " + shardActor);
+ shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext());
+
+ }
+
+ private void preCommit(PreCommitTransaction message) {
+ final ListenableFuture<Void> future = cohort.preCommit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new PreCommitTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when preCommitting");
+ }
+ }
+ }, getContext().dispatcher());
+
+ }
+
+ private void canCommit(CanCommitTransaction message) {
+ final ListenableFuture<Boolean> future = cohort.canCommit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Boolean canCommit = future.get();
+ sender.tell(new CanCommitTransactionReply(canCommit), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when aborting");
+ }
+ }
+ }, getContext().dispatcher());
+
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class AbortTransaction {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class AbortTransactionReply {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CanCommitTransaction {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CanCommitTransactionReply {
+ private final Boolean canCommit;
+
+ public CanCommitTransactionReply(Boolean canCommit) {
+ this.canCommit = canCommit;
+ }
+
+ public Boolean getCanCommit() {
+ return canCommit;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CommitTransaction {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CommitTransactionReply {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+public class ForwardedCommitTransaction {
+ private final DOMStoreThreePhaseCommitCohort cohort;
+ private final Modification modification;
+
+ public ForwardedCommitTransaction(DOMStoreThreePhaseCommitCohort cohort, Modification modification){
+ this.cohort = cohort;
+ this.modification = modification;
+ }
+
+ public DOMStoreThreePhaseCommitCohort getCohort() {
+ return cohort;
+ }
+
+ public Modification getModification() {
+ return modification;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PreCommitTransaction {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class PreCommitTransactionReply {
+}
import akka.actor.ActorPath;
public class ReadyTransactionReply {
- private final ActorPath path;
+ private final ActorPath cohortPath;
- public ReadyTransactionReply(ActorPath path) {
+ public ReadyTransactionReply(ActorPath cohortPath) {
- this.path = path;
+ this.cohortPath = cohortPath;
}
- public ActorPath getPath() {
- return path;
+ public ActorPath getCohortPath() {
+ return cohortPath;
}
}
package org.opendaylight.controller.cluster.datastore.modification;
+
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import java.io.Serializable;
+
/**
* Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction
*/
-public abstract class AbstractModification implements Modification {
- protected final InstanceIdentifier path;
+public abstract class AbstractModification implements Modification,
+ Serializable {
+
+ private static final long serialVersionUID = 1638042650152084457L;
+
+ protected final InstanceIdentifier path;
- protected AbstractModification(InstanceIdentifier path) {
- this.path = path;
- }
+ protected AbstractModification(InstanceIdentifier path) {
+ this.path = path;
+ }
}
* </p>
*/
public interface CompositeModification extends Modification {
- List<Modification> getModifications();
+ /**
+ * Get a list of Modifications contained by this Composite
+ * @return
+ */
+ List<Modification> getModifications();
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class MutableCompositeModification implements CompositeModification {
- private final List<Modification> modifications = new ArrayList<>();
+/**
+ * MutableCompositeModification is just a mutable version of a
+ * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)}
+ */
+public class MutableCompositeModification
+ implements CompositeModification, Serializable {
- @Override
- public void apply(DOMStoreWriteTransaction transaction) {
- for(Modification modification : modifications){
- modification.apply(transaction);
- }
- }
+ private static final long serialVersionUID = 1163377899140186790L;
+
+ private final List<Modification> modifications = new ArrayList<>();
- public void addModification(Modification modification){
- modifications.add(modification);
- }
+ @Override
+ public void apply(DOMStoreWriteTransaction transaction) {
+ for (Modification modification : modifications) {
+ modification.apply(transaction);
+ }
+ }
- public List<Modification> getModifications(){
- return Collections.unmodifiableList(modifications);
- }
+ /**
+ * Add a new Modification to the list of Modifications represented by this
+ * composite
+ *
+ * @param modification
+ */
+ public void addModification(Modification modification) {
+ modifications.add(modification);
+ }
+ public List<Modification> getModifications() {
+ return Collections.unmodifiableList(modifications);
+ }
}
* WriteModification stores all the parameters required to write data to the specified path
*/
public class WriteModification extends AbstractModification {
- private final NormalizedNode data;
+ private final NormalizedNode data;
public WriteModification(InstanceIdentifier path, NormalizedNode data) {
super(path);
public void apply(DOMStoreWriteTransaction transaction) {
transaction.write(path, data);
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+public class BasicIntegrationTest extends AbstractActorTest {
+
+ @Test
+ public void integrationTest() {
+ // This test will
+ // - create a Shard
+ // - initiate a transaction
+ // - write something
+ // - read the transaction for commit
+ // - commit the transaction
+
+
+ new JavaTestKit(getSystem()) {{
+ final Props props = Shard.props("config");
+ final ActorRef shard = getSystem().actorOf(props);
+
+ new Within(duration("5 seconds")) {
+ protected void run() {
+
+ shard.tell(
+ new UpdateSchemaContext(TestModel.createTestContext()),
+ getRef());
+
+ shard.tell(new CreateTransactionChain(), getRef());
+
+ final ActorSelection transactionChain =
+ new ExpectMsg<ActorSelection>("match hint") {
+ protected ActorSelection match(Object in) {
+ if (in instanceof CreateTransactionChainReply) {
+ ActorPath transactionChainPath =
+ ((CreateTransactionChainReply) in)
+ .getTransactionChainPath();
+ return getSystem()
+ .actorSelection(transactionChainPath);
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertNotNull(transactionChain);
+
+ transactionChain.tell(new CreateTransaction(), getRef());
+
+ final ActorSelection transaction =
+ new ExpectMsg<ActorSelection>("match hint") {
+ protected ActorSelection match(Object in) {
+ if (in instanceof CreateTransactionReply) {
+ ActorPath transactionPath =
+ ((CreateTransactionReply) in)
+ .getTransactionPath();
+ return getSystem()
+ .actorSelection(transactionPath);
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertNotNull(transaction);
+
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
+ getRef());
+
+ Boolean writeDone = new ExpectMsg<Boolean>("match hint") {
+ protected Boolean match(Object in) {
+ if (in instanceof WriteDataReply) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(writeDone);
+
+ transaction.tell(new ReadyTransaction(), getRef());
+
+ final ActorSelection cohort =
+ new ExpectMsg<ActorSelection>("match hint") {
+ protected ActorSelection match(Object in) {
+ if (in instanceof ReadyTransactionReply) {
+ ActorPath cohortPath =
+ ((ReadyTransactionReply) in)
+ .getCohortPath();
+ return getSystem()
+ .actorSelection(cohortPath);
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertNotNull(cohort);
+
+ cohort.tell(new PreCommitTransaction(), getRef());
+
+ Boolean preCommitDone =
+ new ExpectMsg<Boolean>("match hint") {
+ protected Boolean match(Object in) {
+ if (in instanceof PreCommitTransactionReply) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(preCommitDone);
+
+ cohort.tell(new CommitTransaction(), getRef());
+
+ final Boolean commitDone =
+ new ExpectMsg<Boolean>("match hint") {
+ protected Boolean match(Object in) {
+ if (in instanceof CommitTransactionReply) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(commitDone);
+
+ }
+
+
+ };
+ }};
+
+
+ }
+}
}};
}
+
+
private AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
@Override
@Test
public void testOnReceiveReadData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveWriteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testWriteData");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveMergeData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testMergeData");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveDeleteData() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testDeleteData");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction");
new Within(duration("1 seconds")) {
@Test
public void testOnReceiveCloseTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
- final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef shard = getSystem().actorOf(Shard.props("config"));
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard);
final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
new Within(duration("1 seconds")) {
--- /dev/null
+akka {
+ actor {
+ serializers {
+ java = "akka.serialization.JavaSerializer"
+ }
+
+ serialization-bindings {
+ "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java
+ }
+ }
+}
\ No newline at end of file