Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 50d4e0252f732c3e2f24b16970e83716f9286c99..dbcbd3d02eff96f393e389d6b92ee5487b83ce18 100644 (file)
@@ -60,6 +60,7 @@ import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
@@ -77,7 +78,6 @@ import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -102,8 +102,8 @@ import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
@@ -161,8 +161,12 @@ public class Shard extends RaftActor {
     /// The name of this shard
     private final String name;
 
+    private final String shardName;
+
     private final ShardStats shardMBean;
 
+    private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
+
     private DatastoreContext datastoreContext;
 
     private final ShardCommitCoordinator commitCoordinator;
@@ -180,8 +184,6 @@ public class Shard extends RaftActor {
     private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
-    private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
-
 
     private ShardSnapshot restoreFromSnapshot;
 
@@ -201,6 +203,7 @@ public class Shard extends RaftActor {
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
+        this.shardName = builder.getId().getShardName();
         this.datastoreContext = builder.getDatastoreContext();
         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
         this.frontendMetadata = new FrontendMetadata(name);
@@ -211,15 +214,12 @@ public class Shard extends RaftActor {
 
         ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
                 new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
-        ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
-                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
-                    treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
+                    treeChangeListenerPublisher, name, frontendMetadata);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
-                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
-                    dataChangeListenerPublisher, name, frontendMetadata);
+                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@@ -257,6 +257,10 @@ public class Shard extends RaftActor {
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
                 .assembledMessageCallback((message, sender) -> self().tell(message, sender))
                 .expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
+
+        listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(),
+                self());
+        listenerInfoMXBean.register();
     }
 
     private void setTransactionCommitTimeout() {
@@ -285,6 +289,7 @@ public class Shard extends RaftActor {
         commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
 
         shardMBean.unregisterMBean();
+        listenerInfoMXBean.unregister();
     }
 
     @Override
@@ -331,8 +336,6 @@ public class Shard extends RaftActor {
                 handleAbortTransaction(AbortTransaction.fromSerializable(message));
             } else if (CloseTransactionChain.isSerializedType(message)) {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
-            } else if (message instanceof RegisterChangeListener) {
-                changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof RegisterDataTreeChangeListener) {
                 treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
             } else if (message instanceof UpdateSchemaContext) {
@@ -392,9 +395,7 @@ public class Shard extends RaftActor {
                         responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
                             .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
                             .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
-                            .onFailureCallback(t -> {
-                                LOG.warn("Error slicing response {}", success, t);
-                            }).build()));
+                            .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
                 } else {
                     envelope.sendSuccess(success, executionTimeNanos);
                 }
@@ -471,7 +472,8 @@ public class Shard extends RaftActor {
             }
             if (cmp > 0) {
                 LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
-                throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+                throw new RetiredGenerationException(clientId.getGeneration(),
+                    existing.getIdentifier().getGeneration());
             }
 
             LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
@@ -494,7 +496,8 @@ public class Shard extends RaftActor {
         throw new OutOfSequenceEnvelopeException(0);
     }
 
-    private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+    @Nonnull
+    private static ABIVersion selectVersion(final ConnectClientRequest message) {
         final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
         for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
             if (clientRange.contains(v)) {
@@ -542,7 +545,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+    @Nullable
+    private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || paused || !isLeaderActive()) {
@@ -584,6 +588,10 @@ public class Shard extends RaftActor {
         return roleChangeNotifier;
     }
 
+    String getShardName() {
+        return shardName;
+    }
+
     @Override
     protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
             final short leaderPayloadVersion) {
@@ -675,7 +683,7 @@ public class Shard extends RaftActor {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(batched, getSender(),
-                        "Could not commit transaction " + batched.getTransactionId());
+                        "Could not process BatchedModifications " + batched.getTransactionId());
             } else {
                 // If this is not the first batch and leadership changed in between batched messages,
                 // we need to reconstruct previous BatchedModifications from the transaction
@@ -728,7 +736,7 @@ public class Shard extends RaftActor {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(message, getSender(),
-                        "Could not commit transaction " + message.getTransactionId());
+                        "Could not process ready local transaction " + message.getTransactionId());
             } else {
                 LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
                 message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
@@ -747,12 +755,13 @@ public class Shard extends RaftActor {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {
                 messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
-                        "Could not commit transaction " + forwardedReady.getTransactionId());
+                        "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
             } else {
                 LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
 
                 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
-                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
+                        forwardedReady.getParticipatingShardNames());
                 readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(readyLocal, getContext());
             }
@@ -831,8 +840,11 @@ public class Shard extends RaftActor {
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-        return new ShardRecoveryCoordinator(store,
-            restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
+        if (restoreFromSnapshot == null) {
+            return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
+        }
+
+        return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot());
     }
 
     @Override
@@ -870,7 +882,6 @@ public class Shard extends RaftActor {
     protected void onStateChanged() {
         boolean isLeader = isLeader();
         boolean hasLeader = hasLeader();
-        changeSupport.onLeadershipChange(isLeader, hasLeader);
         treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
 
         // If this actor is no longer the leader close all the transaction chains
@@ -919,6 +930,8 @@ public class Shard extends RaftActor {
                             messagesToForward.size(), leader);
 
                     for (Object message : messagesToForward) {
+                        LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
+
                         leader.tell(message, self());
                     }
                 }
@@ -971,9 +984,8 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
         return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
-                .dataChangeListenerActors(changeSupport.getListenerActors())
                 .commitCohortActors(store.getCohortActors());
     }
 
@@ -1012,7 +1024,7 @@ public class Shard extends RaftActor {
         private DatastoreContext datastoreContext;
         private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
-        private TipProducingDataTree dataTree;
+        private DataTree dataTree;
         private volatile boolean sealed;
 
         protected AbstractBuilder(final Class<S> shardClass) {
@@ -1046,9 +1058,9 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) {
+        public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
             checkSealed();
-            this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider);
+            this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
             return self();
         }
 
@@ -1058,7 +1070,7 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T dataTree(final TipProducingDataTree newDataTree) {
+        public T dataTree(final DataTree newDataTree) {
             checkSealed();
             this.dataTree = newDataTree;
             return self();
@@ -1084,7 +1096,7 @@ public class Shard extends RaftActor {
             return restoreFromSnapshot;
         }
 
-        public TipProducingDataTree getDataTree() {
+        public DataTree getDataTree() {
             return dataTree;
         }
 
@@ -1115,7 +1127,7 @@ public class Shard extends RaftActor {
     }
 
     public static class Builder extends AbstractBuilder<Builder, Shard> {
-        private Builder() {
+        Builder() {
             super(Shard.class);
         }
     }