Merge "Fixed remove rpc api in rpc registry"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 0717598f0c09d83b5c2d3b17a04462ed1f3a4149..21fea96320f30754baa2c300877c768c895b4d02 100644 (file)
@@ -17,8 +17,6 @@ import akka.japi.Creator;
 import akka.serialization.Serialization;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -40,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@ -49,11 +48,11 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -68,9 +67,6 @@ public class Shard extends RaftActor {
 
     public static final String DEFAULT_NAME = "default";
 
-    private final ListeningExecutorService storeExecutor =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
-
     private final InMemoryDOMDataStore store;
 
     private final Map<Object, DOMStoreThreePhaseCommitCohort>
@@ -102,7 +98,7 @@ public class Shard extends RaftActor {
 
         LOG.info("Creating shard : {} persistent : {}", name, persistent);
 
-        store = new InMemoryDOMDataStore(name, storeExecutor);
+        store = InMemoryDOMDataStoreFactory.create(name, null);
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
 
@@ -148,7 +144,7 @@ public class Shard extends RaftActor {
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
             setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
-        } else {
+        } else{
             super.onReceiveCommand(message);
         }
     }
@@ -195,16 +191,14 @@ public class Shard extends RaftActor {
 
         getSender()
             .tell(new CreateTransactionReply(
-                    Serialization.serializedActorPath(transactionActor),
-                    createTransaction.getTransactionId()).toSerializable(),
-                getSelf()
-            );
+                Serialization.serializedActorPath(transactionActor),
+                createTransaction.getTransactionId()).toSerializable(),
+                getSelf());
     }
 
     private void commit(final ActorRef sender, Object serialized) {
-        Modification modification =
-            MutableCompositeModification.fromSerializable(
-                serialized, schemaContext);
+        Modification modification = MutableCompositeModification
+            .fromSerializable(serialized, schemaContext);
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
@@ -221,32 +215,32 @@ public class Shard extends RaftActor {
                 future.get();
                 future = commitCohort.commit();
                 future.get();
-            } catch (InterruptedException e) {
-                LOG.error("Failed to commit", e);
-            } catch (ExecutionException e) {
+            } catch (InterruptedException | ExecutionException e) {
+                shardMBean.incrementFailedTransactionsCount();
                 LOG.error("Failed to commit", e);
+                return;
             }
+            //we want to just apply the recovery commit and return
+            shardMBean.incrementCommittedTransactionCount();
+            return;
         }
 
         final ListenableFuture<Void> future = cohort.commit();
-        shardMBean.incrementCommittedTransactionCount();
         final ActorRef self = getSelf();
         future.addListener(new Runnable() {
             @Override
             public void run() {
                 try {
                     future.get();
-
-                    if (sender != null) {
                         sender
                             .tell(new CommitTransactionReply().toSerializable(),
                                 self);
-                    } else {
-                        LOG.error("sender is null ???");
-                    }
+                        shardMBean.incrementCommittedTransactionCount();
+                        shardMBean.setLastCommittedTransactionTime(new Date());
+
                 } catch (InterruptedException | ExecutionException e) {
-                    // FIXME : Handle this properly
-                    LOG.error(e, "An exception happened when committing");
+                    shardMBean.incrementFailedTransactionsCount();
+                    sender.tell(new akka.actor.Status.Failure(e),self);
                 }
             }
         }, getContext().dispatcher());
@@ -323,8 +317,7 @@ public class Shard extends RaftActor {
         getSender()
             .tell(new CreateTransactionChainReply(transactionChain.path())
                     .toSerializable(),
-                getSelf()
-            );
+                getSelf());
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,