Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStoreActor.java
index 6fbe6ceb4d6c1867d17c5f0983aedcb5b51740b3..f155880c0185677e20e4187df79684858f8be934 100644 (file)
@@ -1,13 +1,15 @@
 /*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
 
@@ -25,8 +27,6 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.SetMultimap;
@@ -37,7 +37,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -72,7 +72,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
      */
     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
 
-    private final RemoteRpcProviderConfig config;
+    private final RemoteOpsProviderConfig config;
     private final String persistenceId;
 
     /**
@@ -88,10 +88,10 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
     private Integer incarnation;
     private boolean persisting;
 
-    protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
-        this.config = Preconditions.checkNotNull(config);
-        this.initialData = Preconditions.checkNotNull(initialData);
-        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+    protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
+        this.config = requireNonNull(config);
+        this.initialData = requireNonNull(initialData);
+        this.persistenceId = requireNonNull(persistenceId);
     }
 
     static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
@@ -106,6 +106,14 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
         return actor -> actor.updateRemoteBuckets(buckets);
     }
 
+    static ExecuteInActor getLocalDataMessage() {
+        return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
+    }
+
+    static ExecuteInActor getRemoteBucketsMessage() {
+        return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
+    }
+
     public final T getLocalData() {
         return getLocalBucket().getData();
     }
@@ -146,18 +154,17 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
             return;
         }
 
-        if (message instanceof ExecuteInActor) {
-            ((ExecuteInActor) message).accept(this);
+        if (message instanceof ExecuteInActor execute) {
+            execute.accept(this);
         } else if (GET_BUCKET_VERSIONS == message) {
             // FIXME: do we need to send ourselves?
             getSender().tell(ImmutableMap.copyOf(versions), getSelf());
-        } else if (message instanceof Terminated) {
-            actorTerminated((Terminated) message);
-        } else if (message instanceof DeleteSnapshotsSuccess) {
-            LOG.debug("{}: got command: {}", persistenceId(), message);
-        } else if (message instanceof DeleteSnapshotsFailure) {
-            LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
-                ((DeleteSnapshotsFailure) message).cause());
+        } else if (message instanceof Terminated terminated) {
+            actorTerminated(terminated);
+        } else if (message instanceof DeleteSnapshotsSuccess deleteSuccess) {
+            LOG.debug("{}: got command: {}", persistenceId(), deleteSuccess);
+        } else if (message instanceof DeleteSnapshotsFailure deleteFailure) {
+            LOG.warn("{}: failed to delete prior snapshots", persistenceId(), deleteFailure.cause());
         } else {
             LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
@@ -165,15 +172,14 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
     }
 
     private void handleSnapshotMessage(final Object message) {
-        if (message instanceof SaveSnapshotFailure) {
-            LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
+        if (message instanceof SaveSnapshotFailure saveFailure) {
+            LOG.error("{}: failed to persist state", persistenceId(), saveFailure.cause());
             persisting = false;
             self().tell(PoisonPill.getInstance(), ActorRef.noSender());
-        } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug("{}: got command: {}", persistenceId(), message);
-            SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
-            deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
-                    saved.metadata().timestamp() - 1, 0L, 0L));
+        } else if (message instanceof SaveSnapshotSuccess saveSuccess) {
+            LOG.debug("{}: got command: {}", persistenceId(), saveSuccess);
+            deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), saveSuccess.metadata().timestamp() - 1,
+                0L, 0L));
             persisting = false;
             unstash();
         } else {
@@ -183,7 +189,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
     }
 
     @Override
-    protected final void handleRecover(final Object message) throws Exception {
+    protected final void handleRecover(final Object message) {
         if (message instanceof RecoveryCompleted) {
             if (incarnation != null) {
                 incarnation = incarnation + 1;
@@ -191,20 +197,20 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
                 incarnation = 0;
             }
 
-            this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
+            this.localBucket = new LocalBucket<>(incarnation, initialData);
             initialData = null;
             LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
             persisting = true;
             saveSnapshot(incarnation);
-        } else if (message instanceof SnapshotOffer) {
-            incarnation = (Integer) ((SnapshotOffer)message).snapshot();
+        } else if (message instanceof SnapshotOffer snapshotOffer) {
+            incarnation = (Integer) snapshotOffer.snapshot();
             LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
         } else {
             LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
         }
     }
 
-    protected final RemoteRpcProviderConfig getConfig() {
+    protected final RemoteOpsProviderConfig getConfig() {
         return config;
     }
 
@@ -216,7 +222,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
         if (bumpIncarnation) {
             LOG.debug("Version wrapped. incrementing incarnation");
 
-            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+            verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
             incarnation = incarnation + 1;
 
             persisting = true;
@@ -230,14 +236,14 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
      * @param address Remote address
      * @param bucket Bucket removed
      */
-    protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+    protected abstract void onBucketRemoved(Address address, Bucket<T> bucket);
 
     /**
      * Callback to subclasses invoked when the set of remote buckets is updated.
      *
      * @param newBuckets Map of address to new bucket. Never null, but can be empty.
      */
-    protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
+    protected abstract void onBucketsUpdated(Map<Address, Bucket<T>> newBuckets);
 
     /**
      * Helper to collect all known buckets.
@@ -285,6 +291,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
             bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
             onBucketRemoved(addr, bucket);
         }
+        versions.remove(addr);
     }
 
     /**
@@ -368,7 +375,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
             versions.remove(addr);
             final Bucket<T> bucket = remoteBuckets.remove(addr);
             if (bucket != null) {
-                LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+                LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
                 onBucketRemoved(addr, bucket);
             }
         }
@@ -380,7 +387,7 @@ public abstract class BucketStoreActor<T extends BucketData<T>> extends
     }
 
     private LocalBucket<T> getLocalBucket() {
-        Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
+        checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
         return localBucket;
     }
 }