Reduce JSR305 proliferation
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalNotification;
18 import java.io.IOException;
19 import java.io.ObjectOutputStream;
20 import java.io.Serializable;
21 import java.util.Optional;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicLong;
24 import java.util.function.Predicate;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
27 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
28 import org.opendaylight.yangtools.concepts.Identifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
34  *
35  * @author Thomas Pantelis
36  * @see MessageAssembler
37  */
38 public class MessageSlicer implements AutoCloseable {
39     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
40     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
41     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
42
43     private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
44     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
45     private final int messageSliceSize;
46     private final int maxSlicingTries;
47     private final String logContext;
48     private final long id;
49
50     MessageSlicer(final Builder builder) {
51         this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
52         this.messageSliceSize = builder.messageSliceSize;
53         this.maxSlicingTries = builder.maxSlicingTries;
54
55         id = SLICER_ID_COUNTER.getAndIncrement();
56         this.logContext = builder.logContext + "_slicer-id-" + id;
57
58         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
59                 CacheBuilder.newBuilder().removalListener(this::stateRemoved);
60         if (builder.expireStateAfterInactivityDuration > 0) {
61             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
62                     builder.expireStateAfterInactivityUnit);
63         }
64         stateCache = cacheBuilder.build();
65     }
66
67     @VisibleForTesting
68     long getId() {
69         return id;
70     }
71
72     /**
73      * Returns a new Builder for creating MessageSlicer instances.
74      *
75      * @return a Builder instance
76      */
77     public static Builder builder() {
78         return new Builder();
79     }
80
81     /**
82      * Checks if the given message is handled by this class. If so, it should be forwarded to the
83      * {@link #handleMessage(Object)} method
84      *
85      * @param message the message to check
86      * @return true if handled, false otherwise
87      */
88     public static boolean isHandledMessage(final Object message) {
89         return message instanceof MessageSliceReply;
90     }
91
92     /**
93      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
94      * options.
95      *
96      * @param options the SliceOptions
97      * @return true if the message was sliced, false otherwise
98      */
99     public boolean slice(final SliceOptions options) {
100         final Identifier identifier = options.getIdentifier();
101         final Serializable message = options.getMessage();
102         final FileBackedOutputStream fileBackedStream;
103         if (message != null) {
104             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
105
106             requireNonNull(fileBackedStreamFactory,
107                     "The FiledBackedStreamFactory must be set in order to call this slice method");
108
109             // Serialize the message to a FileBackedOutputStream.
110             fileBackedStream = fileBackedStreamFactory.newInstance();
111             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
112                 out.writeObject(message);
113             } catch (IOException e) {
114                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
115                 fileBackedStream.cleanup();
116                 options.getOnFailureCallback().accept(e);
117                 return false;
118             }
119         } else {
120             fileBackedStream = options.getFileBackedStream();
121         }
122
123         return initializeSlicing(options, fileBackedStream);
124     }
125
126     private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
127         final Identifier identifier = options.getIdentifier();
128         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
129         SlicedMessageState<ActorRef> state = null;
130         try {
131             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
132                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
133
134             final Serializable message = options.getMessage();
135             if (state.getTotalSlices() == 1 && message != null) {
136                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
137                 state.close();
138                 sendTo(options, message, options.getReplyTo());
139                 return false;
140             }
141
142             final MessageSlice firstSlice = getNextSliceMessage(state);
143
144             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
145
146             stateCache.put(messageSliceId, state);
147             sendTo(options, firstSlice, ActorRef.noSender());
148             return true;
149         } catch (IOException e) {
150             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
151             if (state != null) {
152                 state.close();
153             } else {
154                 fileBackedStream.cleanup();
155             }
156
157             options.getOnFailureCallback().accept(e);
158             return false;
159         }
160     }
161
162     private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
163         if (options.getSendToRef() != null) {
164             options.getSendToRef().tell(message, sender);
165         } else {
166             options.getSendToSelection().tell(message, sender);
167         }
168     }
169
170     /**
171      * Invoked to handle messages pertaining to this class.
172      *
173      * @param message the message
174      * @return true if the message was handled, false otherwise
175      */
176     public boolean handleMessage(final Object message) {
177         if (message instanceof MessageSliceReply) {
178             LOG.debug("{}: handleMessage: {}", logContext, message);
179             return onMessageSliceReply((MessageSliceReply) message);
180         }
181
182         return false;
183     }
184
185     /**
186      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
187      * on the other end.
188      */
189     public void checkExpiredSlicedMessageState() {
190         if (stateCache.size() > 0) {
191             stateCache.cleanUp();
192         }
193     }
194
195     /**
196      * Closes and removes all in-progress sliced message state.
197      */
198     @Override
199     public void close() {
200         LOG.debug("{}: Closing", logContext);
201         stateCache.invalidateAll();
202     }
203
204     /**
205      * Cancels all in-progress sliced message state that matches the given filter.
206      *
207      * @param filter filters by Identifier
208      */
209     public void cancelSlicing(final @NonNull Predicate<Identifier> filter) {
210         stateCache.asMap().keySet().removeIf(
211             messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
212     }
213
214     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
215         final byte[] firstSliceBytes = state.getNextSlice();
216         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
217                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
218     }
219
220     private boolean onMessageSliceReply(final MessageSliceReply reply) {
221         final Identifier identifier = reply.getIdentifier();
222         if (!(identifier instanceof MessageSliceIdentifier)
223                 || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
224             return false;
225         }
226
227         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
228         if (state == null) {
229             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
230             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
231             return true;
232         }
233
234         synchronized (state) {
235             try {
236                 final Optional<MessageSliceException> failure = reply.getFailure();
237                 if (failure.isPresent()) {
238                     LOG.warn("{}: Received failed {}", logContext, reply);
239                     processMessageSliceException(failure.get(), state, reply.getSendTo());
240                     return true;
241                 }
242
243                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
244                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
245                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
246                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
247                     possiblyRetrySlicing(state, reply.getSendTo());
248                     return true;
249                 }
250
251                 if (state.isLastSlice(reply.getSliceIndex())) {
252                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
253                     removeState(identifier);
254                 } else {
255                     final MessageSlice nextSlice = getNextSliceMessage(state);
256                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
257                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
258                 }
259             } catch (IOException e) {
260                 LOG.warn("{}: Error processing {}", logContext, reply, e);
261                 fail(state, e);
262             }
263         }
264
265         return true;
266     }
267
268     private void processMessageSliceException(final MessageSliceException exception,
269             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
270         if (exception.isRetriable()) {
271             possiblyRetrySlicing(state, sendTo);
272         } else {
273             fail(state, exception.getCause() != null ? exception.getCause() : exception);
274         }
275     }
276
277     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
278             throws IOException {
279         if (state.canRetry()) {
280             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
281             state.reset();
282             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
283         } else {
284             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
285                     state.getIdentifier());
286             LOG.warn(message);
287             fail(state, new RuntimeException(message));
288         }
289     }
290
291     private void removeState(final Identifier identifier) {
292         LOG.debug("{}: Removing state for {}", logContext, identifier);
293         stateCache.invalidate(identifier);
294     }
295
296     private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
297         final SlicedMessageState<ActorRef> state = notification.getValue();
298         state.close();
299         if (notification.wasEvicted()) {
300             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
301             state.getOnFailureCallback().accept(new RuntimeException(String.format(
302                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
303                      + "component on the other end", state.getIdentifier())));
304         } else {
305             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
306                     notification.getKey(), notification.getCause());
307         }
308     }
309
310     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
311         removeState(state.getIdentifier());
312         state.getOnFailureCallback().accept(failure);
313     }
314
315     @VisibleForTesting
316     boolean hasState(final Identifier forIdentifier) {
317         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
318         stateCache.cleanUp();
319         return exists;
320     }
321
322     public static class Builder {
323         private FileBackedOutputStreamFactory fileBackedStreamFactory;
324         private int messageSliceSize = -1;
325         private long expireStateAfterInactivityDuration = -1;
326         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
327         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
328         private String logContext = "<no-context>";
329
330         /**
331          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
332          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
333          * If Serializable messages aren't passed then the factory need not be set.
334          *
335          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
336          * @return this Builder
337          */
338         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
339             this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
340             return this;
341         }
342
343         /**
344          * Sets the maximum size (in bytes) for a message slice.
345          *
346          * @param newMessageSliceSize the maximum size (in bytes)
347          * @return this Builder
348          */
349         public Builder messageSliceSize(final int newMessageSliceSize) {
350             checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
351             this.messageSliceSize = newMessageSliceSize;
352             return this;
353         }
354
355         /**
356          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
357          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
358          *
359          * @param newMaxSlicingTries the maximum number of tries
360          * @return this Builder
361          */
362         public Builder maxSlicingTries(final int newMaxSlicingTries) {
363             checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
364             this.maxSlicingTries = newMaxSlicingTries;
365             return this;
366         }
367
368         /**
369          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
370          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
371          * state is not purged due to inactivity.
372          *
373          * @param duration the length of time after which a state entry is purged
374          * @param unit the unit the duration is expressed in
375          * @return this Builder
376          */
377         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
378             checkArgument(duration > 0, "duration must be > 0");
379             this.expireStateAfterInactivityDuration = duration;
380             this.expireStateAfterInactivityUnit = unit;
381             return this;
382         }
383
384         /**
385          * Sets the context for log messages.
386          *
387          * @param newLogContext the log context
388          * @return this Builder
389          */
390         public Builder logContext(final String newLogContext) {
391             this.logContext = requireNonNull(newLogContext);
392             return this;
393         }
394
395         /**
396          * Builds a new MessageSlicer instance.
397          *
398          * @return a new MessageSlicer
399          */
400         public MessageSlicer build() {
401             return new MessageSlicer(this);
402         }
403     }
404 }