From: Moiz Raja Date: Mon, 23 Jun 2014 23:25:41 +0000 (-0700) Subject: Implement commiting of data X-Git-Tag: release/helium~578 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=da6ca13e3f8ecb73a62f7e8f9ae3bdcae9d851f8 Implement commiting of data - Implement ThreePhaseCommitCohort Actor - Implement a BasicIntegrationTest to test out using a Shard upto committing - Make modifications in Shard, ShardTransaction, ShardTransactionChain to make the flow work Change-Id: I4eff32833c09d89f81753db29ea38ac26b9dfbf6 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index f96cb14510..d75edc7922 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -13,20 +13,29 @@ import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; +import akka.persistence.Persistent; import akka.persistence.UntypedProcessor; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; /** @@ -43,6 +52,8 @@ public class Shard extends UntypedProcessor { private final InMemoryDOMDataStore store; + private final Map modificationToCohort = new HashMap<>(); + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private Shard(String name){ @@ -68,7 +79,40 @@ public class Shard extends UntypedProcessor { registerChangeListener((RegisterChangeListener) message); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); + } else if (message instanceof ForwardedCommitTransaction ) { + handleForwardedCommit((ForwardedCommitTransaction) message); + } else if (message instanceof Persistent){ + commit((Persistent) message); + } + } + + private void commit(Persistent message) { + Modification modification = (Modification) message.payload(); + DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification); + if(cohort == null){ + log.error("Could not find cohort for modification : " + modification); + return; } + final ListenableFuture future = cohort.commit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new CommitTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when committing"); + } + } + }, getContext().dispatcher()); + } + + private void handleForwardedCommit(ForwardedCommitTransaction message) { + log.info("received forwarded transaction"); + modificationToCohort.put(message.getModification(), message.getCohort()); + getSelf().forward(Persistent.create(message.getModification()), getContext()); } private void updateSchemaContext(UpdateSchemaContext message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index f43dd7b52a..75744cad5b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -65,23 +65,26 @@ import java.util.concurrent.ExecutionException; */ public class ShardTransaction extends UntypedActor { + private final ActorRef shardActor; + private final DOMStoreReadWriteTransaction transaction; private final MutableCompositeModification modification = new MutableCompositeModification(); private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - public ShardTransaction(DOMStoreReadWriteTransaction transaction) { + public ShardTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor) { this.transaction = transaction; + this.shardActor = shardActor; } - public static Props props(final DOMStoreReadWriteTransaction transaction){ + public static Props props(final DOMStoreReadWriteTransaction transaction, final ActorRef shardActor){ return Props.create(new Creator(){ @Override public ShardTransaction create() throws Exception { - return new ShardTransaction(transaction); + return new ShardTransaction(transaction, shardActor); } }); } @@ -151,7 +154,7 @@ public class ShardTransaction extends UntypedActor { private void readyTransaction(ReadyTransaction message){ DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); - ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort)); + ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort, shardActor, modification)); getSender().tell(new ReadyTransactionReply(cohortActor.path()), getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 83913fe416..79aaa86b28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -34,7 +34,7 @@ public class ShardTransactionChain extends UntypedActor{ public void onReceive(Object message) throws Exception { if(message instanceof CreateTransaction){ DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction(); - ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction)); + ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(transaction, getContext().parent())); getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf()); } else if (message instanceof CloseTransactionChain){ chain.close(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index 8e21cb2d86..61baf1ab64 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -8,30 +8,121 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; import akka.japi.Creator; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import java.util.concurrent.ExecutionException; + public class ThreePhaseCommitCohort extends UntypedActor{ private final DOMStoreThreePhaseCommitCohort cohort; + private final ActorRef shardActor; + private final CompositeModification modification; - public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort) { - + public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) { this.cohort = cohort; + this.shardActor = shardActor; + this.modification = modification; } - @Override - public void onReceive(Object message) throws Exception { - throw new UnsupportedOperationException("onReceive"); - } + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - public static Props props(final DOMStoreThreePhaseCommitCohort cohort) { + public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) { return Props.create(new Creator(){ @Override public ThreePhaseCommitCohort create() throws Exception { - return new ThreePhaseCommitCohort(cohort); + return new ThreePhaseCommitCohort(cohort, shardActor, modification); } }); } + + @Override + public void onReceive(Object message) throws Exception { + if(message instanceof CanCommitTransaction){ + canCommit((CanCommitTransaction) message); + } else if(message instanceof PreCommitTransaction) { + preCommit((PreCommitTransaction) message); + } else if(message instanceof CommitTransaction){ + commit((CommitTransaction) message); + } else if (message instanceof AbortTransaction){ + abort((AbortTransaction) message); + } + } + + private void abort(AbortTransaction message) { + final ListenableFuture future = cohort.abort(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new AbortTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when aborting"); + } + } + }, getContext().dispatcher()); + } + + private void commit(CommitTransaction message) { + // Forward the commit to the shard + log.info("Commit transaction now + " + shardActor); + shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext()); + + } + + private void preCommit(PreCommitTransaction message) { + final ListenableFuture future = cohort.preCommit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new PreCommitTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when preCommitting"); + } + } + }, getContext().dispatcher()); + + } + + private void canCommit(CanCommitTransaction message) { + final ListenableFuture future = cohort.canCommit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + + future.addListener(new Runnable() { + @Override + public void run() { + try { + Boolean canCommit = future.get(); + sender.tell(new CanCommitTransactionReply(canCommit), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when aborting"); + } + } + }, getContext().dispatcher()); + + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java new file mode 100644 index 0000000000..4cf713a0b3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class AbortTransaction { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java new file mode 100644 index 0000000000..84234e5807 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class AbortTransactionReply { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java new file mode 100644 index 0000000000..526d60fc75 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class CanCommitTransaction { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java new file mode 100644 index 0000000000..d143c14b3b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class CanCommitTransactionReply { + private final Boolean canCommit; + + public CanCommitTransactionReply(Boolean canCommit) { + this.canCommit = canCommit; + } + + public Boolean getCanCommit() { + return canCommit; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java new file mode 100644 index 0000000000..d7b210fd03 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class CommitTransaction { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java new file mode 100644 index 0000000000..a0e5e895d2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class CommitTransactionReply { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java new file mode 100644 index 0000000000..01049930b5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedCommitTransaction.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; + +public class ForwardedCommitTransaction { + private final DOMStoreThreePhaseCommitCohort cohort; + private final Modification modification; + + public ForwardedCommitTransaction(DOMStoreThreePhaseCommitCohort cohort, Modification modification){ + this.cohort = cohort; + this.modification = modification; + } + + public DOMStoreThreePhaseCommitCohort getCohort() { + return cohort; + } + + public Modification getModification() { + return modification; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java new file mode 100644 index 0000000000..87a9c77a4f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class PreCommitTransaction { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java new file mode 100644 index 0000000000..f499c720d2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +public class PreCommitTransactionReply { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java index 48565d4fbb..32d31bf84d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java @@ -11,14 +11,14 @@ package org.opendaylight.controller.cluster.datastore.messages; import akka.actor.ActorPath; public class ReadyTransactionReply { - private final ActorPath path; + private final ActorPath cohortPath; - public ReadyTransactionReply(ActorPath path) { + public ReadyTransactionReply(ActorPath cohortPath) { - this.path = path; + this.cohortPath = cohortPath; } - public ActorPath getPath() { - return path; + public ActorPath getCohortPath() { + return cohortPath; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java index ffb263519a..5d9f96277d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java @@ -8,15 +8,22 @@ package org.opendaylight.controller.cluster.datastore.modification; + import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import java.io.Serializable; + /** * Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction */ -public abstract class AbstractModification implements Modification { - protected final InstanceIdentifier path; +public abstract class AbstractModification implements Modification, + Serializable { + + private static final long serialVersionUID = 1638042650152084457L; + + protected final InstanceIdentifier path; - protected AbstractModification(InstanceIdentifier path) { - this.path = path; - } + protected AbstractModification(InstanceIdentifier path) { + this.path = path; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java index 3a8eff1aa5..4c856d31eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/CompositeModification.java @@ -18,5 +18,9 @@ import java.util.List; *

*/ public interface CompositeModification extends Modification { - List getModifications(); + /** + * Get a list of Modifications contained by this Composite + * @return + */ + List getModifications(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java index 983ac860e1..9f37ba42d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java @@ -10,26 +10,40 @@ package org.opendaylight.controller.cluster.datastore.modification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -public class MutableCompositeModification implements CompositeModification { - private final List modifications = new ArrayList<>(); +/** + * MutableCompositeModification is just a mutable version of a + * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)} + */ +public class MutableCompositeModification + implements CompositeModification, Serializable { - @Override - public void apply(DOMStoreWriteTransaction transaction) { - for(Modification modification : modifications){ - modification.apply(transaction); - } - } + private static final long serialVersionUID = 1163377899140186790L; + + private final List modifications = new ArrayList<>(); - public void addModification(Modification modification){ - modifications.add(modification); - } + @Override + public void apply(DOMStoreWriteTransaction transaction) { + for (Modification modification : modifications) { + modification.apply(transaction); + } + } - public List getModifications(){ - return Collections.unmodifiableList(modifications); - } + /** + * Add a new Modification to the list of Modifications represented by this + * composite + * + * @param modification + */ + public void addModification(Modification modification) { + modifications.add(modification); + } + public List getModifications() { + return Collections.unmodifiableList(modifications); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java index c5b12d071a..1b2a87f42b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java @@ -16,8 +16,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; * WriteModification stores all the parameters required to write data to the specified path */ public class WriteModification extends AbstractModification { - private final NormalizedNode data; + private final NormalizedNode data; public WriteModification(InstanceIdentifier path, NormalizedNode data) { super(path); @@ -28,4 +28,5 @@ public class WriteModification extends AbstractModification { public void apply(DOMStoreWriteTransaction transaction) { transaction.write(path, data); } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java new file mode 100644 index 0000000000..8c3ec82a54 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; + +public class BasicIntegrationTest extends AbstractActorTest { + + @Test + public void integrationTest() { + // This test will + // - create a Shard + // - initiate a transaction + // - write something + // - read the transaction for commit + // - commit the transaction + + + new JavaTestKit(getSystem()) {{ + final Props props = Shard.props("config"); + final ActorRef shard = getSystem().actorOf(props); + + new Within(duration("5 seconds")) { + protected void run() { + + shard.tell( + new UpdateSchemaContext(TestModel.createTestContext()), + getRef()); + + shard.tell(new CreateTransactionChain(), getRef()); + + final ActorSelection transactionChain = + new ExpectMsg("match hint") { + protected ActorSelection match(Object in) { + if (in instanceof CreateTransactionChainReply) { + ActorPath transactionChainPath = + ((CreateTransactionChainReply) in) + .getTransactionChainPath(); + return getSystem() + .actorSelection(transactionChainPath); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertNotNull(transactionChain); + + transactionChain.tell(new CreateTransaction(), getRef()); + + final ActorSelection transaction = + new ExpectMsg("match hint") { + protected ActorSelection match(Object in) { + if (in instanceof CreateTransactionReply) { + ActorPath transactionPath = + ((CreateTransactionReply) in) + .getTransactionPath(); + return getSystem() + .actorSelection(transactionPath); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertNotNull(transaction); + + transaction.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)), + getRef()); + + Boolean writeDone = new ExpectMsg("match hint") { + protected Boolean match(Object in) { + if (in instanceof WriteDataReply) { + return true; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(writeDone); + + transaction.tell(new ReadyTransaction(), getRef()); + + final ActorSelection cohort = + new ExpectMsg("match hint") { + protected ActorSelection match(Object in) { + if (in instanceof ReadyTransactionReply) { + ActorPath cohortPath = + ((ReadyTransactionReply) in) + .getCohortPath(); + return getSystem() + .actorSelection(cohortPath); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertNotNull(cohort); + + cohort.tell(new PreCommitTransaction(), getRef()); + + Boolean preCommitDone = + new ExpectMsg("match hint") { + protected Boolean match(Object in) { + if (in instanceof PreCommitTransactionReply) { + return true; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(preCommitDone); + + cohort.tell(new CommitTransaction(), getRef()); + + final Boolean commitDone = + new ExpectMsg("match hint") { + protected Boolean match(Object in) { + if (in instanceof CommitTransactionReply) { + return true; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(commitDone); + + } + + + }; + }}; + + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 7f2a836b6f..a9d8042ce2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -87,6 +87,8 @@ public class ShardTest extends AbstractActorTest{ }}; } + + private AsyncDataChangeListener> noOpDataChangeListener(){ return new AsyncDataChangeListener>() { @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 075001b0d8..9116f24c92 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -43,7 +43,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadData() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = ShardTransaction.props(store.newReadWriteTransaction()); + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); final ActorRef subject = getSystem().actorOf(props, "testReadData"); new Within(duration("1 seconds")) { @@ -103,7 +104,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveWriteData() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = ShardTransaction.props(store.newReadWriteTransaction()); + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); final ActorRef subject = getSystem().actorOf(props, "testWriteData"); new Within(duration("1 seconds")) { @@ -136,7 +138,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveMergeData() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = ShardTransaction.props(store.newReadWriteTransaction()); + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); final ActorRef subject = getSystem().actorOf(props, "testMergeData"); new Within(duration("1 seconds")) { @@ -170,7 +173,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveDeleteData() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = ShardTransaction.props(store.newReadWriteTransaction()); + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); final ActorRef subject = getSystem().actorOf(props, "testDeleteData"); new Within(duration("1 seconds")) { @@ -204,7 +208,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = ShardTransaction.props(store.newReadWriteTransaction()); + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction"); new Within(duration("1 seconds")) { @@ -237,7 +242,8 @@ public class ShardTransactionTest extends AbstractActorTest { @Test public void testOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ - final Props props = ShardTransaction.props(store.newReadWriteTransaction()); + final ActorRef shard = getSystem().actorOf(Shard.props("config")); + final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard); final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction"); new Within(duration("1 seconds")) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf new file mode 100644 index 0000000000..2647850667 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -0,0 +1,11 @@ +akka { + actor { + serializers { + java = "akka.serialization.JavaSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification" = java + } + } +} \ No newline at end of file