/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorSelection; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies */ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{ private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private final ActorContext actorContext; private final List cohortPaths; //FIXME : Use a thread pool here private final ExecutorService executorService = Executors.newSingleThreadExecutor(); public ThreePhaseCommitCohortProxy(ActorContext actorContext, List cohortPaths) { this.actorContext = actorContext; this.cohortPaths = cohortPaths; } @Override public ListenableFuture canCommit() { Callable call = new Callable() { @Override public Boolean call() throws Exception { for(ActorPath actorPath : cohortPaths){ ActorSelection cohort = actorContext.actorSelection(actorPath); try { Object response = actorContext.executeRemoteOperation(cohort, new CanCommitTransaction(), ActorContext.ASK_DURATION); if (response instanceof CanCommitTransactionReply) { CanCommitTransactionReply reply = (CanCommitTransactionReply) response; if (!reply.getCanCommit()) { return false; } } } catch(RuntimeException e){ LOG.error("Unexpected Exception", e); return false; } } return true; } }; ListenableFutureTask future = ListenableFutureTask.create(call); executorService.submit(future); return future; } @Override public ListenableFuture preCommit() { return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class); } @Override public ListenableFuture abort() { return voidOperation(new AbortTransaction(), AbortTransactionReply.class); } @Override public ListenableFuture commit() { return voidOperation(new CommitTransaction(), CommitTransactionReply.class); } private ListenableFuture voidOperation(final Object message, final Class expectedResponseClass){ Callable call = new Callable() { @Override public Void call() throws Exception { for(ActorPath actorPath : cohortPaths){ ActorSelection cohort = actorContext.actorSelection(actorPath); Object response = actorContext.executeRemoteOperation(cohort, message, ActorContext.ASK_DURATION); if(response != null && !response.getClass().equals(expectedResponseClass)){ throw new RuntimeException( String.format( "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s", expectedResponseClass.toString(), response.getClass().toString())); } } return null; } }; ListenableFutureTask future = ListenableFutureTask.create(call); executorService.submit(future); return future; } public List getCohortPaths() { return Collections.unmodifiableList(this.cohortPaths); } }