From a2b92b2d72c28b9913131c0340f87d2424f44108 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Tue, 24 Jun 2014 19:50:57 -0700 Subject: [PATCH] Complete implementation of ThreePhaseCommitCohortProxy Change-Id: I3a365da24cfc072b18be4208131ae10ce0dccc3a Signed-off-by: Moiz Raja --- .../ThreePhaseCommitCohortProxy.java | 100 +++++++++++++++++- .../cluster/datastore/TransactionProxy.java | 2 +- .../exceptions/PrimaryNotFoundException.java | 12 +++ .../exceptions/TimeoutException.java | 15 +++ .../cluster/datastore/utils/ActorContext.java | 11 +- .../ThreePhaseCommitCohortProxyTest.java | 82 ++++++++++++++ .../cluster/datastore/utils/TestUtils.java | 37 +++++++ 7 files changed, 249 insertions(+), 10 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index 197b3b70ce..36b6efa2f7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -9,11 +9,27 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; +import akka.actor.ActorSelection; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies @@ -21,27 +37,101 @@ import java.util.List; public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{ + private static final Logger + LOG = LoggerFactory.getLogger(DistributedDataStore.class); + + private final ActorContext actorContext; private final List cohortPaths; + //FIXME : Use a thread pool here + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); - public ThreePhaseCommitCohortProxy(List cohortPaths) { + public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths) { + this.actorContext = actorContext; this.cohortPaths = cohortPaths; } @Override public ListenableFuture canCommit() { - throw new UnsupportedOperationException("canCommit"); + Callable call = new Callable() { + + @Override public Boolean call() throws Exception { + for(ActorPath actorPath : cohortPaths){ + ActorSelection cohort = actorContext.actorSelection(actorPath); + + try { + Object response = + actorContext.executeRemoteOperation(cohort, + new CanCommitTransaction(), + ActorContext.ASK_DURATION); + + if (response instanceof CanCommitTransactionReply) { + CanCommitTransactionReply reply = + (CanCommitTransactionReply) response; + if (!reply.getCanCommit()) { + return false; + } + } + } catch(RuntimeException e){ + LOG.error("Unexpected Exception", e); + return false; + } + + + } + return true; + } + }; + + ListenableFutureTask + future = ListenableFutureTask.create(call); + + executorService.submit(future); + + return future; } @Override public ListenableFuture preCommit() { - throw new UnsupportedOperationException("preCommit"); + return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class); } @Override public ListenableFuture abort() { - throw new UnsupportedOperationException("abort"); + return voidOperation(new AbortTransaction(), AbortTransactionReply.class); } @Override public ListenableFuture commit() { - throw new UnsupportedOperationException("commit"); + return voidOperation(new CommitTransaction(), CommitTransactionReply.class); + } + + private ListenableFuture voidOperation(final Object message, final Class expectedResponseClass){ + Callable call = new Callable() { + + @Override public Void call() throws Exception { + for(ActorPath actorPath : cohortPaths){ + ActorSelection cohort = actorContext.actorSelection(actorPath); + + Object response = actorContext.executeRemoteOperation(cohort, + message, + ActorContext.ASK_DURATION); + + if(response != null && !response.getClass().equals(expectedResponseClass)){ + throw new RuntimeException( + String.format( + "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s", + expectedResponseClass.toString(), + response.getClass().toString())); + } + } + return null; + } + }; + + ListenableFutureTask + future = ListenableFutureTask.create(call); + + executorService.submit(future); + + return future; + } public List getCohortPaths() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 32bb7d0951..811b851697 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -142,7 +142,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - return new ThreePhaseCommitCohortProxy(cohortPaths); + return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java new file mode 100644 index 0000000000..5a131ade33 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.exceptions; + +public class PrimaryNotFoundException extends RuntimeException { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java new file mode 100644 index 0000000000..4780aaccfb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.exceptions; + +public class TimeoutException extends RuntimeException { + public TimeoutException(Exception e){ + super(e); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index ba4d4de6bf..0aa205fa06 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -13,6 +13,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.util.Timeout; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.slf4j.Logger; @@ -81,7 +83,7 @@ public class ActorContext { return actorSystem.actorSelection(found.getPrimaryPath()); } - throw new RuntimeException("primary was not found"); + throw new PrimaryNotFoundException(); } /** @@ -99,7 +101,7 @@ public class ActorContext { try { return Await.result(future, AWAIT_DURATION); } catch (Exception e) { - throw new RuntimeException(e); + throw new TimeoutException(e); } } @@ -118,7 +120,7 @@ public class ActorContext { try { return Await.result(future, AWAIT_DURATION); } catch (Exception e) { - throw new RuntimeException(e); + throw new TimeoutException(e); } } @@ -131,7 +133,8 @@ public class ActorContext { * @param shardName * @param message * @param duration - * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out + * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out + * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found * * @return */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java new file mode 100644 index 0000000000..af3da57571 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -0,0 +1,82 @@ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.util.concurrent.ListenableFuture; +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; +import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; + +import java.util.Arrays; + +import static org.junit.Assert.assertNotNull; + +public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { + + private ThreePhaseCommitCohortProxy proxy; + private Props props; + private ActorRef actorRef; + private MockActorContext actorContext; + + @Before + public void setUp(){ + props = Props.create(MessageCollectorActor.class); + actorRef = getSystem().actorOf(props); + actorContext = new MockActorContext(this.getSystem()); + + proxy = + new ThreePhaseCommitCohortProxy(actorContext, + Arrays.asList(actorRef.path())); + + } + + @Test + public void testCanCommit() throws Exception { + actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true)); + + ListenableFuture future = proxy.canCommit(); + + Assert.assertTrue(future.get().booleanValue()); + + } + + @Test + public void testPreCommit() throws Exception { + actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply()); + + ListenableFuture future = proxy.preCommit(); + + future.get(); + + } + + @Test + public void testAbort() throws Exception { + actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply()); + + ListenableFuture future = proxy.abort(); + + future.get(); + + } + + @Test + public void testCommit() throws Exception { + actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply()); + + ListenableFuture future = proxy.commit(); + + future.get(); + } + + @Test + public void testGetCohortPaths() throws Exception { + assertNotNull(proxy.getCohortPaths()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java new file mode 100644 index 0000000000..bb881d5322 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import junit.framework.Assert; + +import java.util.List; + +public class TestUtils { + + public static void assertFirstSentMessage(ActorSystem actorSystem, ActorRef actorRef, Class clazz){ + ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf( + Props.create(DoNothingActor.class))); + Object messages = testContext + .executeLocalOperation(actorRef, "messages", + ActorContext.ASK_DURATION); + + Assert.assertNotNull(messages); + + Assert.assertTrue(messages instanceof List); + + List listMessages = (List) messages; + + Assert.assertEquals(1, listMessages.size()); + + Assert.assertTrue(listMessages.get(0).getClass().equals(clazz)); + } +} -- 2.36.6