package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
public class ThreePhaseCommitCohortProxy implements
DOMStoreThreePhaseCommitCohort{
+ private static final Logger
+ LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
+ private final ActorContext actorContext;
private final List<ActorPath> cohortPaths;
+ //FIXME : Use a thread pool here
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
- public ThreePhaseCommitCohortProxy(List<ActorPath> cohortPaths) {
+ public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+ this.actorContext = actorContext;
this.cohortPaths = cohortPaths;
}
@Override public ListenableFuture<Boolean> canCommit() {
- throw new UnsupportedOperationException("canCommit");
+ Callable<Boolean> call = new Callable() {
+
+ @Override public Boolean call() throws Exception {
+ for(ActorPath actorPath : cohortPaths){
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ try {
+ Object response =
+ actorContext.executeRemoteOperation(cohort,
+ new CanCommitTransaction(),
+ ActorContext.ASK_DURATION);
+
+ if (response instanceof CanCommitTransactionReply) {
+ CanCommitTransactionReply reply =
+ (CanCommitTransactionReply) response;
+ if (!reply.getCanCommit()) {
+ return false;
+ }
+ }
+ } catch(RuntimeException e){
+ LOG.error("Unexpected Exception", e);
+ return false;
+ }
+
+
+ }
+ return true;
+ }
+ };
+
+ ListenableFutureTask<Boolean>
+ future = ListenableFutureTask.create(call);
+
+ executorService.submit(future);
+
+ return future;
}
@Override public ListenableFuture<Void> preCommit() {
- throw new UnsupportedOperationException("preCommit");
+ return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
}
@Override public ListenableFuture<Void> abort() {
- throw new UnsupportedOperationException("abort");
+ return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
}
@Override public ListenableFuture<Void> commit() {
- throw new UnsupportedOperationException("commit");
+ return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+ }
+
+ private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
+ Callable<Void> call = new Callable<Void>() {
+
+ @Override public Void call() throws Exception {
+ for(ActorPath actorPath : cohortPaths){
+ ActorSelection cohort = actorContext.actorSelection(actorPath);
+
+ Object response = actorContext.executeRemoteOperation(cohort,
+ message,
+ ActorContext.ASK_DURATION);
+
+ if(response != null && !response.getClass().equals(expectedResponseClass)){
+ throw new RuntimeException(
+ String.format(
+ "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
+ expectedResponseClass.toString(),
+ response.getClass().toString()));
+ }
+ }
+ return null;
+ }
+ };
+
+ ListenableFutureTask<Void>
+ future = ListenableFutureTask.create(call);
+
+ executorService.submit(future);
+
+ return future;
+
}
public List<ActorPath> getCohortPaths() {
}
}
- return new ThreePhaseCommitCohortProxy(cohortPaths);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class PrimaryNotFoundException extends RuntimeException {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class TimeoutException extends RuntimeException {
+ public TimeoutException(Exception e){
+ super(e);
+ }
+}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.slf4j.Logger;
return actorSystem.actorSelection(found.getPrimaryPath());
}
- throw new RuntimeException("primary was not found");
+ throw new PrimaryNotFoundException();
}
/**
try {
return Await.result(future, AWAIT_DURATION);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new TimeoutException(e);
}
}
try {
return Await.result(future, AWAIT_DURATION);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new TimeoutException(e);
}
}
* @param shardName
* @param message
* @param duration
- * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*
* @return
*/
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.util.concurrent.ListenableFuture;
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertNotNull;
+
+public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
+
+ private ThreePhaseCommitCohortProxy proxy;
+ private Props props;
+ private ActorRef actorRef;
+ private MockActorContext actorContext;
+
+ @Before
+ public void setUp(){
+ props = Props.create(MessageCollectorActor.class);
+ actorRef = getSystem().actorOf(props);
+ actorContext = new MockActorContext(this.getSystem());
+
+ proxy =
+ new ThreePhaseCommitCohortProxy(actorContext,
+ Arrays.asList(actorRef.path()));
+
+ }
+
+ @Test
+ public void testCanCommit() throws Exception {
+ actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true));
+
+ ListenableFuture<Boolean> future = proxy.canCommit();
+
+ Assert.assertTrue(future.get().booleanValue());
+
+ }
+
+ @Test
+ public void testPreCommit() throws Exception {
+ actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply());
+
+ ListenableFuture<Void> future = proxy.preCommit();
+
+ future.get();
+
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply());
+
+ ListenableFuture<Void> future = proxy.abort();
+
+ future.get();
+
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply());
+
+ ListenableFuture<Void> future = proxy.commit();
+
+ future.get();
+ }
+
+ @Test
+ public void testGetCohortPaths() throws Exception {
+ assertNotNull(proxy.getCohortPaths());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import junit.framework.Assert;
+
+import java.util.List;
+
+public class TestUtils {
+
+ public static void assertFirstSentMessage(ActorSystem actorSystem, ActorRef actorRef, Class clazz){
+ ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
+ Props.create(DoNothingActor.class)));
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages",
+ ActorContext.ASK_DURATION);
+
+ Assert.assertNotNull(messages);
+
+ Assert.assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
+
+ Assert.assertEquals(1, listMessages.size());
+
+ Assert.assertTrue(listMessages.get(0).getClass().equals(clazz));
+ }
+}