Add MXBean to report shard registered DTCL/DCL info
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index c003d901af800c6df9505e25a52b3501c7a0e4ee..1c85aef97899bc464f103d217089a6b9da137eb1 100644 (file)
@@ -11,9 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import akka.actor.ExtendedActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.serialization.JavaSerializer;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -58,6 +60,7 @@ import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -82,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.controller.cluster.messaging.SliceOptions;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -160,6 +164,8 @@ public class Shard extends RaftActor {
 
     private final ShardStats shardMBean;
 
+    private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
+
     private DatastoreContext datastoreContext;
 
     private final ShardCommitCoordinator commitCoordinator;
@@ -191,6 +197,8 @@ public class Shard extends RaftActor {
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
 
+    private final MessageAssembler requestMessageAssembler;
+
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
@@ -247,6 +255,15 @@ public class Shard extends RaftActor {
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .expireStateAfterInactivity(2, TimeUnit.MINUTES).build();
+
+        requestMessageAssembler = MessageAssembler.builder().logContext(this.name)
+                .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
+                .assembledMessageCallback((message, sender) -> self().tell(message, sender))
+                .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
+
+        listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(),
+                self());
+        listenerInfoMXBean.register();
     }
 
     private void setTransactionCommitTimeout() {
@@ -275,6 +292,7 @@ public class Shard extends RaftActor {
         commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
 
         shardMBean.unregisterMBean();
+        listenerInfoMXBean.unregister();
     }
 
     @Override
@@ -301,6 +319,8 @@ public class Shard extends RaftActor {
 
             if (message instanceof RequestEnvelope) {
                 handleRequestEnvelope((RequestEnvelope)message);
+            } else if (MessageAssembler.isHandledMessage(message)) {
+                handleRequestAssemblerMessage(message);
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
             } else if (CreateTransaction.isSerializedType(message)) {
@@ -361,6 +381,13 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void handleRequestAssemblerMessage(final Object message) {
+        dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+            JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+            requestMessageAssembler.handleMessage(message, self());
+        });
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void handleRequestEnvelope(final RequestEnvelope envelope) {
         final long now = ticker().read();
@@ -393,6 +420,7 @@ public class Shard extends RaftActor {
     private void commitTimeoutCheck() {
         store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
         commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+        requestMessageAssembler.checkExpiredAssembledMessageState();
     }
 
     private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
@@ -881,6 +909,8 @@ public class Shard extends RaftActor {
                 knownFrontends = ImmutableMap.of();
             }
 
+            requestMessageAssembler.close();
+
             if (!hasLeader()) {
                 // No leader anywhere, nothing else to do
                 return;
@@ -949,7 +979,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
         return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
                 .dataChangeListenerActors(changeSupport.getListenerActors())
                 .commitCohortActors(store.getCohortActors());