Restart downed nodes.
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageAssembler.java
index 328ff0c44a507c9301cbe89f16a18b3ae68668fb..297186d9f7c6bd9f13f8c6bea68ae33274931836 100644 (file)
@@ -7,12 +7,13 @@
  */
 package org.opendaylight.controller.cluster.messaging;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
@@ -20,7 +21,7 @@ import java.io.ObjectInputStream;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
-import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * @author Thomas Pantelis
  * @see MessageSlicer
  */
-public class MessageAssembler implements AutoCloseable {
+public final  class MessageAssembler implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
 
     private final Cache<Identifier, AssembledMessageState> stateCache;
@@ -40,17 +41,16 @@ public class MessageAssembler implements AutoCloseable {
     private final BiConsumer<Object, ActorRef> assembledMessageCallback;
     private final String logContext;
 
-    private MessageAssembler(Builder builder) {
-        this.fileBackedStreamFactory = Preconditions.checkNotNull(builder.fileBackedStreamFactory,
+    MessageAssembler(final Builder builder) {
+        this.fileBackedStreamFactory = requireNonNull(builder.fileBackedStreamFactory,
                 "FiledBackedStreamFactory cannot be null");
-        this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
+        this.assembledMessageCallback = requireNonNull(builder.assembledMessageCallback,
                 "assembledMessageCallback cannot be null");
         this.logContext = builder.logContext;
 
         stateCache = CacheBuilder.newBuilder()
                 .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
-                .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
-                    stateRemoved(notification)).build();
+                .removalListener(this::stateRemoved).build();
     }
 
     /**
@@ -69,7 +69,7 @@ public class MessageAssembler implements AutoCloseable {
      * @param message the message to check
      * @return true if handled, false otherwise
      */
-    public static boolean isHandledMessage(Object message) {
+    public static boolean isHandledMessage(final Object message) {
         return message instanceof MessageSlice || message instanceof AbortSlicing;
     }
 
@@ -96,7 +96,7 @@ public class MessageAssembler implements AutoCloseable {
      * @param sendTo the reference of the actor to which subsequent message slices should be sent
      * @return true if the message was handled, false otherwise
      */
-    public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) {
+    public boolean handleMessage(final Object message, final @NonNull ActorRef sendTo) {
         if (message instanceof MessageSlice) {
             LOG.debug("{}: handleMessage: {}", logContext, message);
             onMessageSlice((MessageSlice) message, sendTo);
@@ -144,7 +144,7 @@ public class MessageAssembler implements AutoCloseable {
                 messageSlice.getSliceIndex()), true);
     }
 
-    private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+    private void processMessageSliceForState(final MessageSlice messageSlice, final AssembledMessageState state,
             final ActorRef sendTo) {
         final Identifier identifier = messageSlice.getIdentifier();
         final ActorRef replyTo = messageSlice.getReplyTo();
@@ -177,7 +177,7 @@ public class MessageAssembler implements AutoCloseable {
         }
     }
 
-    private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+    private static Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
         try {
             final ByteSource assembledBytes = state.getAssembledBytes();
             try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
@@ -190,7 +190,7 @@ public class MessageAssembler implements AutoCloseable {
         }
     }
 
-    private void onAbortSlicing(AbortSlicing message) {
+    private void onAbortSlicing(final AbortSlicing message) {
         removeState(message.getIdentifier());
     }
 
@@ -199,7 +199,7 @@ public class MessageAssembler implements AutoCloseable {
         stateCache.invalidate(identifier);
     }
 
-    private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+    private void stateRemoved(final RemovalNotification<Identifier, AssembledMessageState> notification) {
         if (notification.wasEvicted()) {
             LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
         } else {
@@ -211,7 +211,7 @@ public class MessageAssembler implements AutoCloseable {
     }
 
     @VisibleForTesting
-    boolean hasState(Identifier forIdentifier) {
+    boolean hasState(final Identifier forIdentifier) {
         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
         stateCache.cleanUp();
         return exists;
@@ -231,7 +231,7 @@ public class MessageAssembler implements AutoCloseable {
          * @return this Builder
          */
         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
-            this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
+            this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
             return this;
         }
 
@@ -257,7 +257,7 @@ public class MessageAssembler implements AutoCloseable {
          * @return this Builder
          */
         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
-            Preconditions.checkArgument(duration > 0, "duration must be > 0");
+            checkArgument(duration > 0, "duration must be > 0");
             this.expireStateAfterInactivityDuration = duration;
             this.expireStateAfterInactivityUnit = unit;
             return this;