package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import java.util.concurrent.ExecutionException;
/**
* The ShardTransaction Actor represents a remote transaction
* the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
* time there are no known advantages for creating a read-only or write-only transaction which may change over time
* at which point we can optimize things in the distributed store as well.
+ *
+ * Handles Messages
+ * ----------------
+ * {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
+ * {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
+ * {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
+ * {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
+ * {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
+ * {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
*/
public class ShardTransaction extends UntypedActor {
private final DOMStoreReadWriteTransaction transaction;
+ private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+
public ShardTransaction(DOMStoreReadWriteTransaction transaction) {
this.transaction = transaction;
}
@Override
public void onReceive(Object message) throws Exception {
- throw new UnsupportedOperationException("onReceive");
+ if(message instanceof ReadData){
+ readData((ReadData) message);
+ } else if(message instanceof WriteData){
+ writeData((WriteData) message);
+ } else if(message instanceof MergeData){
+ mergeData((MergeData) message);
+ } else if(message instanceof DeleteData){
+ deleteData((DeleteData) message);
+ } else if(message instanceof ReadyTransaction){
+ readyTransaction((ReadyTransaction) message);
+ } else if(message instanceof CloseTransaction){
+ closeTransaction((CloseTransaction) message);
+ }
+ }
+
+ private void readData(ReadData message) {
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ final InstanceIdentifier path = message.getPath();
+ final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(path);
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Optional<NormalizedNode<?, ?>> optional = future.get();
+ if(optional.isPresent()){
+ sender.tell(new ReadDataReply(optional.get()), self);
+ } else {
+ //TODO : Need to decide what to do here
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when reading data from path : " + path.toString());
+ }
+
+ }
+ }, getContext().dispatcher());
+ }
+
+
+ private void writeData(WriteData message){
+ transaction.write(message.getPath(), message.getData());
+ getSender().tell(new WriteDataReply(), getSelf());
+ }
+
+ private void mergeData(MergeData message){
+ transaction.merge(message.getPath(), message.getData());
+ getSender().tell(new MergeDataReply(), getSelf());
+ }
+
+ private void deleteData(DeleteData message){
+ transaction.delete(message.getPath());
+ getSender().tell(new DeleteDataReply(), getSelf());
+ }
+
+ private void readyTransaction(ReadyTransaction message){
+ DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+ ActorRef cohortActor = getContext().actorOf(ThreePhaseCommitCohort.props(cohort));
+ getSender().tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
+
+ }
+
+ private void closeTransaction(CloseTransaction message){
+ transaction.close();
+ getSender().tell(new CloseTransactionReply(), getSelf());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+public class ThreePhaseCommitCohort extends UntypedActor{
+ private final DOMStoreThreePhaseCommitCohort cohort;
+
+ public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort) {
+
+ this.cohort = cohort;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ throw new UnsupportedOperationException("onReceive");
+ }
+
+ public static Props props(final DOMStoreThreePhaseCommitCohort cohort) {
+ return Props.create(new Creator<ThreePhaseCommitCohort>(){
+ @Override
+ public ThreePhaseCommitCohort create() throws Exception {
+ return new ThreePhaseCommitCohort(cohort);
+ }
+ });
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseTransaction {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class CloseTransactionReply {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class DeleteData {
+ private final InstanceIdentifier path;
+
+ public DeleteData(InstanceIdentifier path) {
+ this.path = path;
+ }
+
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class DeleteDataReply {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class MergeData extends ModifyData {
+ public MergeData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+ super(path, data);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class MergeDataReply {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public abstract class ModifyData {
+ private final InstanceIdentifier path;
+ private final NormalizedNode<?,?> data;
+
+ public ModifyData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+ this.path = path;
+ this.data = data;
+ }
+
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+
+ public NormalizedNode<?, ?> getData() {
+ return data;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class ReadData {
+ private final InstanceIdentifier path;
+
+ public ReadData(InstanceIdentifier path) {
+ this.path = path;
+ }
+
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class ReadDataReply {
+ private final NormalizedNode<?, ?> normalizedNode;
+
+ public ReadDataReply(NormalizedNode<?, ?> normalizedNode){
+
+ this.normalizedNode = normalizedNode;
+ }
+
+ public NormalizedNode<?, ?> getNormalizedNode() {
+ return normalizedNode;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class ReadyTransaction {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorPath;
+
+public class ReadyTransactionReply {
+ private final ActorPath path;
+
+ public ReadyTransactionReply(ActorPath path) {
+
+ this.path = path;
+ }
+
+ public ActorPath getPath() {
+ return path;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class WriteData extends ModifyData{
+
+ public WriteData(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+ super(path, data);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+public class WriteDataReply {
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShardTransactionTest extends AbstractActorTest {
+ private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+
+ static {
+ store.onGlobalContextUpdated(TestModel.createTestContext());
+ }
+
+ @Test
+ public void testOnReceiveReadData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef subject = getSystem().actorOf(props, "testReadData");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new ReadData(InstanceIdentifier.builder().build()), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof ReadDataReply) {
+ if (((ReadDataReply) in).getNormalizedNode() != null) {
+ return "match";
+ }
+ return null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testOnReceiveWriteData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef subject = getSystem().actorOf(props, "testWriteData");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new WriteData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof WriteDataReply) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testOnReceiveMergeData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef subject = getSystem().actorOf(props, "testMergeData");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new MergeData(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof MergeDataReply) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+ @Test
+ public void testOnReceiveDeleteData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef subject = getSystem().actorOf(props, "testDeleteData");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof DeleteDataReply) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+ }
+
+
+ @Test
+ public void testOnReceiveReadyTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef subject = getSystem().actorOf(props, "testReadyTransaction");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new ReadyTransaction(), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof ReadyTransactionReply) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+
+ }
+
+ @Test
+ public void testOnReceiveCloseTransaction() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final Props props = ShardTransaction.props(store.newReadWriteTransaction());
+ final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+
+ new Within(duration("1 seconds")) {
+ protected void run() {
+
+ subject.tell(new CloseTransaction(), getRef());
+
+ final String out = new ExpectMsg<String>("match hint") {
+ // do not put code outside this method, will run afterwards
+ protected String match(Object in) {
+ if (in instanceof CloseTransactionReply) {
+ return "match";
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ assertEquals("match", out);
+
+ expectNoMsg();
+ }
+
+
+ };
+ }};
+
+ }
+
+
+}
\ No newline at end of file