Fixup checkstyle
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalNotification;
18 import java.io.IOException;
19 import java.io.ObjectOutputStream;
20 import java.io.Serializable;
21 import java.util.Optional;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicLong;
24 import java.util.function.Predicate;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
27 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
28 import org.opendaylight.yangtools.concepts.Identifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
34  *
35  * @author Thomas Pantelis
36  * @see MessageAssembler
37  */
38 public class MessageSlicer implements AutoCloseable {
39     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
40     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
41     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
42
43     private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
44     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
45     private final int messageSliceSize;
46     private final int maxSlicingTries;
47     private final String logContext;
48     private final long id;
49
50     MessageSlicer(final Builder builder) {
51         fileBackedStreamFactory = builder.fileBackedStreamFactory;
52         messageSliceSize = builder.messageSliceSize;
53         maxSlicingTries = builder.maxSlicingTries;
54
55         id = SLICER_ID_COUNTER.getAndIncrement();
56         logContext = builder.logContext + "_slicer-id-" + id;
57
58         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
59                 CacheBuilder.newBuilder().removalListener(this::stateRemoved);
60         if (builder.expireStateAfterInactivityDuration > 0) {
61             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
62                     builder.expireStateAfterInactivityUnit);
63         }
64         stateCache = cacheBuilder.build();
65     }
66
67     @VisibleForTesting
68     long getId() {
69         return id;
70     }
71
72     /**
73      * Returns a new Builder for creating MessageSlicer instances.
74      *
75      * @return a Builder instance
76      */
77     public static Builder builder() {
78         return new Builder();
79     }
80
81     /**
82      * Checks if the given message is handled by this class. If so, it should be forwarded to the
83      * {@link #handleMessage(Object)} method
84      *
85      * @param message the message to check
86      * @return true if handled, false otherwise
87      */
88     public static boolean isHandledMessage(final Object message) {
89         return message instanceof MessageSliceReply;
90     }
91
92     /**
93      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
94      * options.
95      *
96      * @param options the SliceOptions
97      * @return true if the message was sliced, false otherwise
98      */
99     public boolean slice(final SliceOptions options) {
100         final Identifier identifier = options.getIdentifier();
101         final Serializable message = options.getMessage();
102         final FileBackedOutputStream fileBackedStream;
103         if (message != null) {
104             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
105
106             requireNonNull(fileBackedStreamFactory,
107                     "The FiledBackedStreamFactory must be set in order to call this slice method");
108
109             // Serialize the message to a FileBackedOutputStream.
110             fileBackedStream = fileBackedStreamFactory.newInstance();
111             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
112                 out.writeObject(message);
113             } catch (IOException e) {
114                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
115                 fileBackedStream.cleanup();
116                 options.getOnFailureCallback().accept(e);
117                 return false;
118             }
119         } else {
120             fileBackedStream = options.getFileBackedStream();
121         }
122
123         return initializeSlicing(options, fileBackedStream);
124     }
125
126     private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
127         final Identifier identifier = options.getIdentifier();
128         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
129         SlicedMessageState<ActorRef> state = null;
130         try {
131             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
132                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
133
134             final Serializable message = options.getMessage();
135             if (state.getTotalSlices() == 1 && message != null) {
136                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
137                 state.close();
138                 sendTo(options, message, options.getReplyTo());
139                 return false;
140             }
141
142             final MessageSlice firstSlice = getNextSliceMessage(state);
143
144             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
145
146             stateCache.put(messageSliceId, state);
147             sendTo(options, firstSlice, ActorRef.noSender());
148             return true;
149         } catch (IOException e) {
150             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
151             if (state != null) {
152                 state.close();
153             } else {
154                 fileBackedStream.cleanup();
155             }
156
157             options.getOnFailureCallback().accept(e);
158             return false;
159         }
160     }
161
162     private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
163         if (options.getSendToRef() != null) {
164             options.getSendToRef().tell(message, sender);
165         } else {
166             options.getSendToSelection().tell(message, sender);
167         }
168     }
169
170     /**
171      * Invoked to handle messages pertaining to this class.
172      *
173      * @param message the message
174      * @return true if the message was handled, false otherwise
175      */
176     public boolean handleMessage(final Object message) {
177         if (message instanceof MessageSliceReply sliceReply) {
178             LOG.debug("{}: handleMessage: {}", logContext, sliceReply);
179             return onMessageSliceReply(sliceReply);
180         }
181
182         return false;
183     }
184
185     /**
186      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
187      * on the other end.
188      */
189     public void checkExpiredSlicedMessageState() {
190         if (stateCache.size() > 0) {
191             stateCache.cleanUp();
192         }
193     }
194
195     /**
196      * Closes and removes all in-progress sliced message state.
197      */
198     @Override
199     public void close() {
200         LOG.debug("{}: Closing", logContext);
201         stateCache.invalidateAll();
202     }
203
204     /**
205      * Cancels all in-progress sliced message state that matches the given filter.
206      *
207      * @param filter filters by Identifier
208      */
209     public void cancelSlicing(final @NonNull Predicate<Identifier> filter) {
210         stateCache.asMap().keySet().removeIf(
211             messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
212     }
213
214     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
215         final byte[] firstSliceBytes = state.getNextSlice();
216         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
217                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
218     }
219
220     private boolean onMessageSliceReply(final MessageSliceReply reply) {
221         final Identifier identifier = reply.getIdentifier();
222         if (!(identifier instanceof MessageSliceIdentifier sliceIdentifier) || sliceIdentifier.getSlicerId() != id) {
223             return false;
224         }
225
226         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
227         if (state == null) {
228             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
229             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
230             return true;
231         }
232
233         synchronized (state) {
234             try {
235                 final Optional<MessageSliceException> failure = reply.getFailure();
236                 if (failure.isPresent()) {
237                     LOG.warn("{}: Received failed {}", logContext, reply);
238                     processMessageSliceException(failure.orElseThrow(), state, reply.getSendTo());
239                     return true;
240                 }
241
242                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
243                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
244                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
245                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
246                     possiblyRetrySlicing(state, reply.getSendTo());
247                     return true;
248                 }
249
250                 if (state.isLastSlice(reply.getSliceIndex())) {
251                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
252                     removeState(identifier);
253                 } else {
254                     final MessageSlice nextSlice = getNextSliceMessage(state);
255                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
256                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
257                 }
258             } catch (IOException e) {
259                 LOG.warn("{}: Error processing {}", logContext, reply, e);
260                 fail(state, e);
261             }
262         }
263
264         return true;
265     }
266
267     private void processMessageSliceException(final MessageSliceException exception,
268             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
269         if (exception.isRetriable()) {
270             possiblyRetrySlicing(state, sendTo);
271         } else {
272             fail(state, exception.getCause() != null ? exception.getCause() : exception);
273         }
274     }
275
276     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
277             throws IOException {
278         if (state.canRetry()) {
279             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
280             state.reset();
281             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
282         } else {
283             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
284                     state.getIdentifier());
285             LOG.warn(message);
286             fail(state, new RuntimeException(message));
287         }
288     }
289
290     private void removeState(final Identifier identifier) {
291         LOG.debug("{}: Removing state for {}", logContext, identifier);
292         stateCache.invalidate(identifier);
293     }
294
295     private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
296         final SlicedMessageState<ActorRef> state = notification.getValue();
297         state.close();
298         if (notification.wasEvicted()) {
299             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
300             state.getOnFailureCallback().accept(new RuntimeException(String.format(
301                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
302                      + "component on the other end", state.getIdentifier())));
303         } else {
304             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
305                     notification.getKey(), notification.getCause());
306         }
307     }
308
309     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
310         removeState(state.getIdentifier());
311         state.getOnFailureCallback().accept(failure);
312     }
313
314     @VisibleForTesting
315     boolean hasState(final Identifier forIdentifier) {
316         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
317         stateCache.cleanUp();
318         return exists;
319     }
320
321     public static class Builder {
322         private FileBackedOutputStreamFactory fileBackedStreamFactory;
323         private int messageSliceSize = -1;
324         private long expireStateAfterInactivityDuration = -1;
325         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
326         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
327         private String logContext = "<no-context>";
328
329         /**
330          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
331          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
332          * If Serializable messages aren't passed then the factory need not be set.
333          *
334          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
335          * @return this Builder
336          */
337         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
338             fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
339             return this;
340         }
341
342         /**
343          * Sets the maximum size (in bytes) for a message slice.
344          *
345          * @param newMessageSliceSize the maximum size (in bytes)
346          * @return this Builder
347          */
348         public Builder messageSliceSize(final int newMessageSliceSize) {
349             checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
350             messageSliceSize = newMessageSliceSize;
351             return this;
352         }
353
354         /**
355          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
356          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
357          *
358          * @param newMaxSlicingTries the maximum number of tries
359          * @return this Builder
360          */
361         public Builder maxSlicingTries(final int newMaxSlicingTries) {
362             checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
363             maxSlicingTries = newMaxSlicingTries;
364             return this;
365         }
366
367         /**
368          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
369          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
370          * state is not purged due to inactivity.
371          *
372          * @param duration the length of time after which a state entry is purged
373          * @param unit the unit the duration is expressed in
374          * @return this Builder
375          */
376         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
377             checkArgument(duration > 0, "duration must be > 0");
378             expireStateAfterInactivityDuration = duration;
379             expireStateAfterInactivityUnit = unit;
380             return this;
381         }
382
383         /**
384          * Sets the context for log messages.
385          *
386          * @param newLogContext the log context
387          * @return this Builder
388          */
389         public Builder logContext(final String newLogContext) {
390             logContext = requireNonNull(newLogContext);
391             return this;
392         }
393
394         /**
395          * Builds a new MessageSlicer instance.
396          *
397          * @return a new MessageSlicer
398          */
399         public MessageSlicer build() {
400             return new MessageSlicer(this);
401         }
402     }
403 }