Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 1bf32e7fca7191236afeeb2c7ed0048b28db499c..7073ea758b2b751d93708914bfc3935779076908 100644 (file)
@@ -8,6 +8,25 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
+import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -46,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -65,25 +85,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
-import akka.persistence.RecoveryFailure;
-import akka.serialization.Serialization;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -93,8 +94,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
  */
 public class Shard extends RaftActor {
 
-    private static final int HELIUM_1_TX_VERSION = 1;
-
     private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@@ -133,6 +132,8 @@ public class Shard extends RaftActor {
 
     private Cancellable txCommitTimeoutCheckSchedule;
 
+    private Optional<ActorRef> roleChangeNotifier;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -141,8 +142,8 @@ public class Shard extends RaftActor {
 
     private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 
-    protected Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            DatastoreContext datastoreContext, SchemaContext schemaContext) {
+    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+            final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses),
                 Optional.of(datastoreContext.getShardRaftConfig()));
 
@@ -162,7 +163,6 @@ public class Shard extends RaftActor {
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
         if (isMetricsCaptureEnabled()) {
@@ -174,10 +174,13 @@ public class Shard extends RaftActor {
 
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+
+        // create a notifier actor for each cluster member
+        roleChangeNotifier = createRoleChangeNotifier(name.toString());
     }
 
     private static Map<String, String> mapPeerAddresses(
-        Map<ShardIdentifier, String> peerAddresses) {
+        final Map<ShardIdentifier, String> peerAddresses) {
         Map<String, String> map = new HashMap<>();
 
         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
@@ -190,7 +193,7 @@ public class Shard extends RaftActor {
 
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        DatastoreContext datastoreContext, SchemaContext schemaContext) {
+        final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
@@ -199,6 +202,12 @@ public class Shard extends RaftActor {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
+    private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
+        ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
+        return Optional.<ActorRef>of(shardRoleChangeNotifier);
+    }
+
     @Override
     public void postStop() {
         super.postStop();
@@ -209,7 +218,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveRecover(Object message) throws Exception {
+    public void onReceiveRecover(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveRecover: Received message {} from {}",
                 message.getClass().toString(),
@@ -228,7 +237,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void onReceiveCommand(Object message) throws Exception {
+    public void onReceiveCommand(final Object message) throws Exception {
         if(LOG.isDebugEnabled()) {
             LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
         }
@@ -262,6 +271,11 @@ public class Shard extends RaftActor {
         }
     }
 
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
@@ -275,7 +289,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCommitTransaction(CommitTransaction commit) {
+    private void handleCommitTransaction(final CommitTransaction commit) {
         final String transactionID = commit.getTransactionID();
 
         LOG.debug("Committing transaction {}", transactionID);
@@ -370,7 +384,7 @@ public class Shard extends RaftActor {
         commitCoordinator.currentTransactionComplete(transactionID, true);
     }
 
-    private void handleCanCommitTransaction(CanCommitTransaction canCommit) {
+    private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
         LOG.debug("Can committing transaction {}", canCommit.getTransactionID());
         commitCoordinator.handleCanCommit(canCommit, getSender(), self());
     }
@@ -391,7 +405,7 @@ public class Shard extends RaftActor {
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         ActorRef replyActorPath = self();
-        if(ready.getTxnClientVersion() < HELIUM_1_TX_VERSION) {
+        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
             LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
             replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
                     ready.getTransactionID()));
@@ -403,11 +417,11 @@ public class Shard extends RaftActor {
                 readyTransactionReply, getSelf());
     }
 
-    private void handleAbortTransaction(AbortTransaction abort) {
+    private void handleAbortTransaction(final AbortTransaction abort) {
         doAbortTransaction(abort.getTransactionID(), getSender());
     }
 
-    private void doAbortTransaction(String transactionID, final ActorRef sender) {
+    private void doAbortTransaction(final String transactionID, final ActorRef sender) {
         final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             LOG.debug("Aborting transaction {}", transactionID);
@@ -422,7 +436,7 @@ public class Shard extends RaftActor {
 
             Futures.addCallback(future, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void v) {
+                public void onSuccess(final Void v) {
                     shardMBean.incrementAbortTransactionsCount();
 
                     if(sender != null) {
@@ -431,7 +445,7 @@ public class Shard extends RaftActor {
                 }
 
                 @Override
-                public void onFailure(Throwable t) {
+                public void onFailure(final Throwable t) {
                     LOG.error(t, "An exception happened during abort");
 
                     if(sender != null) {
@@ -442,7 +456,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleCreateTransaction(Object message) {
+    private void handleCreateTransaction(final Object message) {
         if (isLeader()) {
             createTransaction(CreateTransaction.fromSerializable(message));
         } else if (getLeader() != null) {
@@ -455,7 +469,7 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void handleReadDataReply(Object message) {
+    private void handleReadDataReply(final Object message) {
         // This must be for install snapshot. Don't want to open this up and trigger
         // deSerialization
 
@@ -469,7 +483,7 @@ public class Shard extends RaftActor {
         getSender().tell(PoisonPill.getInstance(), self());
     }
 
-    private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
+    private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
         DOMStoreTransactionChain chain =
             transactionChains.remove(closeTransactionChain.getTransactionChainId());
 
@@ -492,8 +506,8 @@ public class Shard extends RaftActor {
             }
         }
 
-        if(this.schemaContext == null){
-            throw new NullPointerException("schemaContext should not be null");
+        if(this.schemaContext == null) {
+            throw new IllegalStateException("SchemaContext is not set");
         }
 
         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
@@ -534,9 +548,16 @@ public class Shard extends RaftActor {
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
-        createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
-            createTransaction.getClientVersion());
+        try {
+            ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+                createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+                createTransaction.getVersion());
+
+            getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+                    createTransaction.getTransactionId()).toSerializable(), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
@@ -546,29 +567,25 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier.builder()
                 .remoteTransactionId(remoteTransactionId)
                 .build();
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
+
         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
                 transactionChainId, clientVersion);
 
-        getSender()
-            .tell(new CreateTransactionReply(
-                    Serialization.serializedActorPath(transactionActor),
-                    remoteTransactionId).toSerializable(),
-                getSelf());
-
         return transactionActor;
     }
 
-    private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
+    private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
         throws ExecutionException, InterruptedException {
         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
 
-    private void commitWithNewTransaction(Modification modification) {
+    private void commitWithNewTransaction(final Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
         modification.apply(tx);
         try {
@@ -581,18 +598,18 @@ public class Shard extends RaftActor {
         }
     }
 
-    private void updateSchemaContext(UpdateSchemaContext message) {
+    private void updateSchemaContext(final UpdateSchemaContext message) {
         this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
         store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
     @VisibleForTesting
-    void updateSchemaContext(SchemaContext schemaContext) {
+    void updateSchemaContext(final SchemaContext schemaContext) {
         store.onGlobalContextUpdated(schemaContext);
     }
 
-    private void registerChangeListener(RegisterChangeListener registerChangeListener) {
+    private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
 
         LOG.debug("registerDataChangeListener for {}", registerChangeListener.getPath());
 
@@ -615,12 +632,12 @@ public class Shard extends RaftActor {
         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                     listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                NormalizedNode<?, ?>>> doChangeListenerRegistration(
-            RegisterChangeListener registerChangeListener) {
+            final RegisterChangeListener registerChangeListener) {
 
         ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
                 registerChangeListener.getDataChangeListenerPath());
@@ -650,7 +667,7 @@ public class Shard extends RaftActor {
 
     @Override
     protected
-    void startLogRecoveryBatch(int maxBatchSize) {
+    void startLogRecoveryBatch(final int maxBatchSize) {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         if(LOG.isDebugEnabled()) {
@@ -659,7 +676,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(Payload data) {
+    protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else {
@@ -668,7 +685,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(ByteString snapshot) {
+    protected void applyRecoverySnapshot(final ByteString snapshot) {
         if(recoveryCoordinator == null) {
             recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext);
         }
@@ -733,7 +750,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void applyState(ActorRef clientActor, String identifier, Object data) {
+    protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
@@ -782,7 +799,7 @@ public class Shard extends RaftActor {
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                CreateTransaction.CURRENT_CLIENT_VERSION);
+                CreateTransaction.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
@@ -792,7 +809,7 @@ public class Shard extends RaftActor {
 
     @VisibleForTesting
     @Override
-    protected void applySnapshot(ByteString snapshot) {
+    protected void applySnapshot(final ByteString snapshot) {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
@@ -857,7 +874,7 @@ public class Shard extends RaftActor {
         return dataPersistenceProvider;
     }
 
-    @Override protected void onLeaderChanged(String oldLeader, String newLeader) {
+    @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.setLeader(newLeader);
     }
 
@@ -879,8 +896,8 @@ public class Shard extends RaftActor {
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
 
-        ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-                DatastoreContext datastoreContext, SchemaContext schemaContext) {
+        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+                final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;
             this.datastoreContext = datastoreContext;
@@ -913,11 +930,11 @@ public class Shard extends RaftActor {
         private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                              NormalizedNode<?, ?>>> delegate;
 
-        DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
+        DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
             this.registerChangeListener = registerChangeListener;
         }
 
-        void setDelegate( ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+        void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                             NormalizedNode<?, ?>>> registration) {
             this.delegate = registration;
         }