Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalNotification;
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.io.Serializable;
19 import java.util.Optional;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.function.Predicate;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
25 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
26 import org.opendaylight.yangtools.concepts.Identifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
32  *
33  * @author Thomas Pantelis
34  * @see MessageAssembler
35  */
36 public class MessageSlicer implements AutoCloseable {
37     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
38     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
39     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
40
41     private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
42     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
43     private final int messageSliceSize;
44     private final int maxSlicingTries;
45     private final String logContext;
46     private final long id;
47
48     MessageSlicer(final Builder builder) {
49         this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
50         this.messageSliceSize = builder.messageSliceSize;
51         this.maxSlicingTries = builder.maxSlicingTries;
52
53         id = SLICER_ID_COUNTER.getAndIncrement();
54         this.logContext = builder.logContext + "_slicer-id-" + id;
55
56         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
57                 CacheBuilder.newBuilder().removalListener(this::stateRemoved);
58         if (builder.expireStateAfterInactivityDuration > 0) {
59             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
60                     builder.expireStateAfterInactivityUnit);
61         }
62         stateCache = cacheBuilder.build();
63     }
64
65     @VisibleForTesting
66     long getId() {
67         return id;
68     }
69
70     /**
71      * Returns a new Builder for creating MessageSlicer instances.
72      *
73      * @return a Builder instance
74      */
75     public static Builder builder() {
76         return new Builder();
77     }
78
79     /**
80      * Checks if the given message is handled by this class. If so, it should be forwarded to the
81      * {@link #handleMessage(Object)} method
82      *
83      * @param message the message to check
84      * @return true if handled, false otherwise
85      */
86     public static boolean isHandledMessage(final Object message) {
87         return message instanceof MessageSliceReply;
88     }
89
90     /**
91      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
92      * options.
93      *
94      * @param options the SliceOptions
95      * @return true if the message was sliced, false otherwise
96      */
97     public boolean slice(final SliceOptions options) {
98         final Identifier identifier = options.getIdentifier();
99         final Serializable message = options.getMessage();
100         final FileBackedOutputStream fileBackedStream;
101         if (message != null) {
102             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
103
104             Preconditions.checkNotNull(fileBackedStreamFactory,
105                     "The FiledBackedStreamFactory must be set in order to call this slice method");
106
107             // Serialize the message to a FileBackedOutputStream.
108             fileBackedStream = fileBackedStreamFactory.newInstance();
109             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
110                 out.writeObject(message);
111             } catch (IOException e) {
112                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
113                 fileBackedStream.cleanup();
114                 options.getOnFailureCallback().accept(e);
115                 return false;
116             }
117         } else {
118             fileBackedStream = options.getFileBackedStream();
119         }
120
121         return initializeSlicing(options, fileBackedStream);
122     }
123
124     private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
125         final Identifier identifier = options.getIdentifier();
126         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
127         SlicedMessageState<ActorRef> state = null;
128         try {
129             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
130                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
131
132             final Serializable message = options.getMessage();
133             if (state.getTotalSlices() == 1 && message != null) {
134                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
135                 state.close();
136                 sendTo(options, message, options.getReplyTo());
137                 return false;
138             }
139
140             final MessageSlice firstSlice = getNextSliceMessage(state);
141
142             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
143
144             stateCache.put(messageSliceId, state);
145             sendTo(options, firstSlice, ActorRef.noSender());
146             return true;
147         } catch (IOException e) {
148             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
149             if (state != null) {
150                 state.close();
151             } else {
152                 fileBackedStream.cleanup();
153             }
154
155             options.getOnFailureCallback().accept(e);
156             return false;
157         }
158     }
159
160     private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
161         if (options.getSendToRef() != null) {
162             options.getSendToRef().tell(message, sender);
163         } else {
164             options.getSendToSelection().tell(message, sender);
165         }
166     }
167
168     /**
169      * Invoked to handle messages pertaining to this class.
170      *
171      * @param message the message
172      * @return true if the message was handled, false otherwise
173      */
174     public boolean handleMessage(final Object message) {
175         if (message instanceof MessageSliceReply) {
176             LOG.debug("{}: handleMessage: {}", logContext, message);
177             return onMessageSliceReply((MessageSliceReply) message);
178         }
179
180         return false;
181     }
182
183     /**
184      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
185      * on the other end.
186      */
187     public void checkExpiredSlicedMessageState() {
188         if (stateCache.size() > 0) {
189             stateCache.cleanUp();
190         }
191     }
192
193     /**
194      * Closes and removes all in-progress sliced message state.
195      */
196     @Override
197     public void close() {
198         LOG.debug("{}: Closing", logContext);
199         stateCache.invalidateAll();
200     }
201
202     /**
203      * Cancels all in-progress sliced message state that matches the given filter.
204      *
205      * @param filter filters by Identifier
206      */
207     public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
208         stateCache.asMap().keySet().removeIf(
209             messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
210     }
211
212     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
213         final byte[] firstSliceBytes = state.getNextSlice();
214         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
215                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
216     }
217
218     private boolean onMessageSliceReply(final MessageSliceReply reply) {
219         final Identifier identifier = reply.getIdentifier();
220         if (!(identifier instanceof MessageSliceIdentifier)
221                 || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
222             return false;
223         }
224
225         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
226         if (state == null) {
227             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
228             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
229             return true;
230         }
231
232         synchronized (state) {
233             try {
234                 final Optional<MessageSliceException> failure = reply.getFailure();
235                 if (failure.isPresent()) {
236                     LOG.warn("{}: Received failed {}", logContext, reply);
237                     processMessageSliceException(failure.get(), state, reply.getSendTo());
238                     return true;
239                 }
240
241                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
242                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
243                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
244                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
245                     possiblyRetrySlicing(state, reply.getSendTo());
246                     return true;
247                 }
248
249                 if (state.isLastSlice(reply.getSliceIndex())) {
250                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
251                     removeState(identifier);
252                 } else {
253                     final MessageSlice nextSlice = getNextSliceMessage(state);
254                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
255                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
256                 }
257             } catch (IOException e) {
258                 LOG.warn("{}: Error processing {}", logContext, reply, e);
259                 fail(state, e);
260             }
261         }
262
263         return true;
264     }
265
266     private void processMessageSliceException(final MessageSliceException exception,
267             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
268         if (exception.isRetriable()) {
269             possiblyRetrySlicing(state, sendTo);
270         } else {
271             fail(state, exception.getCause() != null ? exception.getCause() : exception);
272         }
273     }
274
275     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
276             throws IOException {
277         if (state.canRetry()) {
278             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
279             state.reset();
280             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
281         } else {
282             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
283                     state.getIdentifier());
284             LOG.warn(message);
285             fail(state, new RuntimeException(message));
286         }
287     }
288
289     private void removeState(final Identifier identifier) {
290         LOG.debug("{}: Removing state for {}", logContext, identifier);
291         stateCache.invalidate(identifier);
292     }
293
294     private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
295         final SlicedMessageState<ActorRef> state = notification.getValue();
296         state.close();
297         if (notification.wasEvicted()) {
298             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
299             state.getOnFailureCallback().accept(new RuntimeException(String.format(
300                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
301                      + "component on the other end", state.getIdentifier())));
302         } else {
303             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
304                     notification.getKey(), notification.getCause());
305         }
306     }
307
308     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
309         removeState(state.getIdentifier());
310         state.getOnFailureCallback().accept(failure);
311     }
312
313     @VisibleForTesting
314     boolean hasState(final Identifier forIdentifier) {
315         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
316         stateCache.cleanUp();
317         return exists;
318     }
319
320     public static class Builder {
321         private FileBackedOutputStreamFactory fileBackedStreamFactory;
322         private int messageSliceSize = -1;
323         private long expireStateAfterInactivityDuration = -1;
324         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
325         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
326         private String logContext = "<no-context>";
327
328         /**
329          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
330          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
331          * If Serializable messages aren't passed then the factory need not be set.
332          *
333          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
334          * @return this Builder
335          */
336         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
337             this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
338             return this;
339         }
340
341         /**
342          * Sets the maximum size (in bytes) for a message slice.
343          *
344          * @param newMessageSliceSize the maximum size (in bytes)
345          * @return this Builder
346          */
347         public Builder messageSliceSize(final int newMessageSliceSize) {
348             Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
349             this.messageSliceSize = newMessageSliceSize;
350             return this;
351         }
352
353         /**
354          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
355          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
356          *
357          * @param newMaxSlicingTries the maximum number of tries
358          * @return this Builder
359          */
360         public Builder maxSlicingTries(final int newMaxSlicingTries) {
361             Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
362             this.maxSlicingTries = newMaxSlicingTries;
363             return this;
364         }
365
366         /**
367          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
368          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
369          * state is not purged due to inactivity.
370          *
371          * @param duration the length of time after which a state entry is purged
372          * @param unit the unit the duration is expressed in
373          * @return this Builder
374          */
375         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
376             Preconditions.checkArgument(duration > 0, "duration must be > 0");
377             this.expireStateAfterInactivityDuration = duration;
378             this.expireStateAfterInactivityUnit = unit;
379             return this;
380         }
381
382         /**
383          * Sets the context for log messages.
384          *
385          * @param newLogContext the log context
386          * @return this Builder
387          */
388         public Builder logContext(final String newLogContext) {
389             this.logContext = Preconditions.checkNotNull(newLogContext);
390             return this;
391         }
392
393         /**
394          * Builds a new MessageSlicer instance.
395          *
396          * @return a new MessageSlicer
397          */
398         public MessageSlicer build() {
399             return new MessageSlicer(this);
400         }
401     }
402 }