Complete implementation of ThreePhaseCommitCohortProxy 18/8318/6
authorMoiz Raja <moraja@cisco.com>
Wed, 25 Jun 2014 02:50:57 +0000 (19:50 -0700)
committerMoiz Raja <moraja@cisco.com>
Thu, 3 Jul 2014 21:08:26 +0000 (14:08 -0700)
Change-Id: I3a365da24cfc072b18be4208131ae10ce0dccc3a
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java [new file with mode: 0644]

index 197b3b70cefa292e3829b776560b4cd2ae99967f..36b6efa2f7091d9f178d682aded4fc4888f42dff 100644 (file)
@@ -9,11 +9,27 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -21,27 +37,101 @@ import java.util.List;
 public class ThreePhaseCommitCohortProxy implements
     DOMStoreThreePhaseCommitCohort{
 
+    private static final Logger
+        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
+    private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
+    //FIXME : Use a thread pool here
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-    public ThreePhaseCommitCohortProxy(List<ActorPath> cohortPaths) {
 
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+        this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
-        throw new UnsupportedOperationException("canCommit");
+        Callable<Boolean> call = new Callable() {
+
+            @Override public Boolean call() throws Exception {
+            for(ActorPath actorPath : cohortPaths){
+                ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+                try {
+                    Object response =
+                        actorContext.executeRemoteOperation(cohort,
+                            new CanCommitTransaction(),
+                            ActorContext.ASK_DURATION);
+
+                    if (response instanceof CanCommitTransactionReply) {
+                        CanCommitTransactionReply reply =
+                            (CanCommitTransactionReply) response;
+                        if (!reply.getCanCommit()) {
+                            return false;
+                        }
+                    }
+                } catch(RuntimeException e){
+                    LOG.error("Unexpected Exception", e);
+                    return false;
+                }
+
+
+            }
+            return true;
+            }
+        };
+
+        ListenableFutureTask<Boolean>
+            future = ListenableFutureTask.create(call);
+
+        executorService.submit(future);
+
+        return future;
     }
 
     @Override public ListenableFuture<Void> preCommit() {
-        throw new UnsupportedOperationException("preCommit");
+        return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
     }
 
     @Override public ListenableFuture<Void> abort() {
-        throw new UnsupportedOperationException("abort");
+        return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
     }
 
     @Override public ListenableFuture<Void> commit() {
-        throw new UnsupportedOperationException("commit");
+        return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+    }
+
+    private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
+        Callable<Void> call = new Callable<Void>() {
+
+            @Override public Void call() throws Exception {
+                for(ActorPath actorPath : cohortPaths){
+                    ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+                    Object response = actorContext.executeRemoteOperation(cohort,
+                        message,
+                        ActorContext.ASK_DURATION);
+
+                    if(response != null && !response.getClass().equals(expectedResponseClass)){
+                        throw new RuntimeException(
+                            String.format(
+                                "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
+                                expectedResponseClass.toString(),
+                                response.getClass().toString()));
+                    }
+                }
+                return null;
+            }
+        };
+
+        ListenableFutureTask<Void>
+            future = ListenableFutureTask.create(call);
+
+        executorService.submit(future);
+
+        return future;
+
     }
 
     public List<ActorPath> getCohortPaths() {
index 32bb7d0951964975b850c8a1a685ce7d95c03f47..811b851697910cf19d94a5079b1054b8635e8190 100644 (file)
@@ -142,7 +142,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        return new ThreePhaseCommitCohortProxy(cohortPaths);
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java
new file mode 100644 (file)
index 0000000..5a131ad
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class PrimaryNotFoundException extends RuntimeException {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java
new file mode 100644 (file)
index 0000000..4780aac
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class TimeoutException extends RuntimeException {
+    public TimeoutException(Exception e){
+        super(e);
+    }
+}
index ba4d4de6bfaeed985bf1e70c626c4a3bd365a0b1..0aa205fa0621033e575275599c9d68c1ff2c137b 100644 (file)
@@ -13,6 +13,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.slf4j.Logger;
@@ -81,7 +83,7 @@ public class ActorContext {
 
             return actorSystem.actorSelection(found.getPrimaryPath());
         }
-        throw new RuntimeException("primary was not found");
+        throw new PrimaryNotFoundException();
     }
 
     /**
@@ -99,7 +101,7 @@ public class ActorContext {
         try {
             return Await.result(future, AWAIT_DURATION);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new TimeoutException(e);
         }
     }
 
@@ -118,7 +120,7 @@ public class ActorContext {
         try {
             return Await.result(future, AWAIT_DURATION);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new TimeoutException(e);
         }
     }
 
@@ -131,7 +133,8 @@ public class ActorContext {
      * @param shardName
      * @param message
      * @param duration
-     * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      *
      * @return
      */
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
new file mode 100644 (file)
index 0000000..af3da57
--- /dev/null
@@ -0,0 +1,82 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.util.concurrent.ListenableFuture;
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertNotNull;
+
+public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+
+    private ThreePhaseCommitCohortProxy proxy;
+    private Props props;
+    private ActorRef actorRef;
+    private MockActorContext actorContext;
+
+    @Before
+    public void setUp(){
+        props = Props.create(MessageCollectorActor.class);
+        actorRef = getSystem().actorOf(props);
+        actorContext = new MockActorContext(this.getSystem());
+
+        proxy =
+            new ThreePhaseCommitCohortProxy(actorContext,
+                Arrays.asList(actorRef.path()));
+
+    }
+
+    @Test
+    public void testCanCommit() throws Exception {
+        actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true));
+
+        ListenableFuture<Boolean> future = proxy.canCommit();
+
+        Assert.assertTrue(future.get().booleanValue());
+
+    }
+
+    @Test
+    public void testPreCommit() throws Exception {
+        actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply());
+
+        ListenableFuture<Void> future = proxy.preCommit();
+
+        future.get();
+
+    }
+
+    @Test
+    public void testAbort() throws Exception {
+        actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply());
+
+        ListenableFuture<Void> future = proxy.abort();
+
+        future.get();
+
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply());
+
+        ListenableFuture<Void> future = proxy.commit();
+
+        future.get();
+    }
+
+    @Test
+    public void testGetCohortPaths() throws Exception {
+        assertNotNull(proxy.getCohortPaths());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java
new file mode 100644 (file)
index 0000000..bb881d5
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import junit.framework.Assert;
+
+import java.util.List;
+
+public class TestUtils {
+
+    public static void assertFirstSentMessage(ActorSystem actorSystem, ActorRef actorRef, Class clazz){
+        ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
+            Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0).getClass().equals(clazz));
+    }
+}