X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=21fea96320f30754baa2c300877c768c895b4d02;hb=b725fdb758008195a98f7fad0fd3804c363170aa;hp=0717598f0c09d83b5c2d3b17a04462ed1f3a4149;hpb=3e35e82b3b253de08e3ea07d2af8b2c1696272e9;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 0717598f0c..21fea96320 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -17,8 +17,6 @@ import akka.japi.Creator; import akka.serialization.Serialization; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -40,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; @@ -49,11 +48,11 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -68,9 +67,6 @@ public class Shard extends RaftActor { public static final String DEFAULT_NAME = "default"; - private final ListeningExecutorService storeExecutor = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); - private final InMemoryDOMDataStore store; private final Map @@ -102,7 +98,7 @@ public class Shard extends RaftActor { LOG.info("Creating shard : {} persistent : {}", name, persistent); - store = new InMemoryDOMDataStore(name, storeExecutor); + store = InMemoryDOMDataStoreFactory.create(name, null); shardMBean = ShardMBeanFactory.getShardStatsMBean(name); @@ -148,7 +144,7 @@ public class Shard extends RaftActor { } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); - } else { + } else{ super.onReceiveCommand(message); } } @@ -195,16 +191,14 @@ public class Shard extends RaftActor { getSender() .tell(new CreateTransactionReply( - Serialization.serializedActorPath(transactionActor), - createTransaction.getTransactionId()).toSerializable(), - getSelf() - ); + Serialization.serializedActorPath(transactionActor), + createTransaction.getTransactionId()).toSerializable(), + getSelf()); } private void commit(final ActorRef sender, Object serialized) { - Modification modification = - MutableCompositeModification.fromSerializable( - serialized, schemaContext); + Modification modification = MutableCompositeModification + .fromSerializable(serialized, schemaContext); DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(serialized); if (cohort == null) { @@ -221,32 +215,32 @@ public class Shard extends RaftActor { future.get(); future = commitCohort.commit(); future.get(); - } catch (InterruptedException e) { - LOG.error("Failed to commit", e); - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { + shardMBean.incrementFailedTransactionsCount(); LOG.error("Failed to commit", e); + return; } + //we want to just apply the recovery commit and return + shardMBean.incrementCommittedTransactionCount(); + return; } final ListenableFuture future = cohort.commit(); - shardMBean.incrementCommittedTransactionCount(); final ActorRef self = getSelf(); future.addListener(new Runnable() { @Override public void run() { try { future.get(); - - if (sender != null) { sender .tell(new CommitTransactionReply().toSerializable(), self); - } else { - LOG.error("sender is null ???"); - } + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(new Date()); + } catch (InterruptedException | ExecutionException e) { - // FIXME : Handle this properly - LOG.error(e, "An exception happened when committing"); + shardMBean.incrementFailedTransactionsCount(); + sender.tell(new akka.actor.Status.Failure(e),self); } } }, getContext().dispatcher()); @@ -323,8 +317,7 @@ public class Shard extends RaftActor { getSender() .tell(new CreateTransactionChainReply(transactionChain.path()) .toSerializable(), - getSelf() - ); + getSelf()); } @Override protected void applyState(ActorRef clientActor, String identifier,