1454e72cc063d2323ca4844cb9e8dcb5ae7e3d9f
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalListener;
16 import com.google.common.cache.RemovalNotification;
17 import java.io.IOException;
18 import java.io.ObjectOutputStream;
19 import java.io.Serializable;
20 import java.util.Optional;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicLong;
23 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
24 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
25 import org.opendaylight.yangtools.concepts.Identifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
31  *
32  * @author Thomas Pantelis
33  * @see MessageAssembler
34  */
35 public class MessageSlicer implements AutoCloseable {
36     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
37     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
38     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
39
40     private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
41     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
42     private final int messageSliceSize;
43     private final int maxSlicingTries;
44     private final String logContext;
45     private final long id;
46
47     private MessageSlicer(Builder builder) {
48         this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
49         this.messageSliceSize = builder.messageSliceSize;
50         this.maxSlicingTries = builder.maxSlicingTries;
51
52         id = SLICER_ID_COUNTER.getAndIncrement();
53         this.logContext = builder.logContext + "_slicer-id-" + id;
54
55         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
56                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
57         if (builder.expireStateAfterInactivityDuration > 0) {
58             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
59                     builder.expireStateAfterInactivityUnit);
60         }
61
62         stateCache = cacheBuilder.build();
63     }
64
65     @VisibleForTesting
66     long getId() {
67         return id;
68     }
69
70     /**
71      * Returns a new Builder for creating MessageSlicer instances.
72      *
73      * @return a Builder instance
74      */
75     public static Builder builder() {
76         return new Builder();
77     }
78
79     /**
80      * Checks if the given message is handled by this class. If so, it should be forwarded to the
81      * {@link #handleMessage(Object)} method
82      *
83      * @param message the message to check
84      * @return true if handled, false otherwise
85      */
86     public static boolean isHandledMessage(Object message) {
87         return message instanceof MessageSliceReply;
88     }
89
90     /**
91      * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
92      * options.
93      *
94      * @param options the SliceOptions
95      */
96     public void slice(SliceOptions options) {
97         final Identifier identifier = options.getIdentifier();
98         final Serializable message = options.getMessage();
99         final FileBackedOutputStream fileBackedStream;
100         if (message != null) {
101             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
102
103             Preconditions.checkNotNull(fileBackedStreamFactory,
104                     "The FiledBackedStreamFactory must be set in order to call this slice method");
105
106             // Serialize the message to a FileBackedOutputStream.
107             fileBackedStream = fileBackedStreamFactory.newInstance();
108             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
109                 out.writeObject(message);
110             } catch (IOException e) {
111                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
112                 fileBackedStream.cleanup();
113                 options.getOnFailureCallback().accept(e);
114                 return;
115             }
116         } else {
117             fileBackedStream = options.getFileBackedStream();
118         }
119
120         initializeSlicing(options, fileBackedStream);
121     }
122
123     private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
124         final Identifier identifier = options.getIdentifier();
125         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
126         SlicedMessageState<ActorRef> state = null;
127         try {
128             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
129                     options.getReplyTo(), options.getOnFailureCallback(), logContext);
130
131             final Serializable message = options.getMessage();
132             if (state.getTotalSlices() == 1 && message != null) {
133                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
134                 state.close();
135                 sendTo(options, message, options.getReplyTo());
136                 return;
137             }
138
139             final MessageSlice firstSlice = getNextSliceMessage(state);
140
141             LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
142
143             stateCache.put(messageSliceId, state);
144             sendTo(options, firstSlice, ActorRef.noSender());
145         } catch (IOException e) {
146             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
147             if (state != null) {
148                 state.close();
149             } else {
150                 fileBackedStream.cleanup();
151             }
152
153             options.getOnFailureCallback().accept(e);
154         }
155     }
156
157     private void sendTo(SliceOptions options, Object message, ActorRef sender) {
158         if (options.getSendToRef() != null) {
159             options.getSendToRef().tell(message, sender);
160         } else {
161             options.getSendToSelection().tell(message, sender);
162         }
163     }
164
165     /**
166      * Invoked to handle messages pertaining to this class.
167      *
168      * @param message the message
169      * @return true if the message was handled, false otherwise
170      */
171     public boolean handleMessage(final Object message) {
172         if (message instanceof MessageSliceReply) {
173             LOG.debug("{}: handleMessage: {}", logContext, message);
174             return onMessageSliceReply((MessageSliceReply) message);
175         }
176
177         return false;
178     }
179
180     /**
181      * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
182      * on the other end.
183      */
184     public void checkExpiredSlicedMessageState() {
185         if (stateCache.size() > 0) {
186             stateCache.cleanUp();
187         }
188     }
189
190     /**
191      * Closes and removes all in-progress sliced message state.
192      */
193     @Override
194     public void close() {
195         LOG.debug("{}: Closing", logContext);
196         stateCache.invalidateAll();
197     }
198
199     private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
200         final byte[] firstSliceBytes = state.getNextSlice();
201         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
202                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
203     }
204
205     private boolean onMessageSliceReply(final MessageSliceReply reply) {
206         final Identifier identifier = reply.getIdentifier();
207         if (!(identifier instanceof MessageSliceIdentifier)
208                 || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
209             return false;
210         }
211
212         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
213         if (state == null) {
214             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
215             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
216             return true;
217         }
218
219         synchronized (state) {
220             try {
221                 final Optional<MessageSliceException> failure = reply.getFailure();
222                 if (failure.isPresent()) {
223                     LOG.warn("{}: Received failed {}", logContext, reply);
224                     processMessageSliceException(failure.get(), state, reply.getSendTo());
225                     return true;
226                 }
227
228                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
229                     LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
230                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
231                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
232                     possiblyRetrySlicing(state, reply.getSendTo());
233                     return true;
234                 }
235
236                 if (state.isLastSlice(reply.getSliceIndex())) {
237                     LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
238                     removeState(identifier);
239                 } else {
240                     final MessageSlice nextSlice = getNextSliceMessage(state);
241                     LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
242                     reply.getSendTo().tell(nextSlice, ActorRef.noSender());
243                 }
244             } catch (IOException e) {
245                 LOG.warn("{}: Error processing {}", logContext, reply, e);
246                 fail(state, e);
247             }
248         }
249
250         return true;
251     }
252
253     private void processMessageSliceException(final MessageSliceException exception,
254             final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
255         if (exception.isRetriable()) {
256             possiblyRetrySlicing(state, sendTo);
257         } else {
258             fail(state, exception.getCause() != null ? exception.getCause() : exception);
259         }
260     }
261
262     private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
263             throws IOException {
264         if (state.canRetry()) {
265             LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
266             state.reset();
267             sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
268         } else {
269             String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
270                     state.getIdentifier());
271             LOG.warn(message);
272             fail(state, new RuntimeException(message));
273         }
274     }
275
276     private void removeState(final Identifier identifier) {
277         LOG.debug("{}: Removing state for {}", logContext, identifier);
278         stateCache.invalidate(identifier);
279     }
280
281     private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
282         final SlicedMessageState<ActorRef> state = notification.getValue();
283         state.close();
284         if (notification.wasEvicted()) {
285             LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
286             state.getOnFailureCallback().accept(new RuntimeException(String.format(
287                     "The slicing state for message identifier %s was expired due to inactivity from the assembling "
288                      + "component on the other end", state.getIdentifier())));
289         } else {
290             LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
291                     notification.getKey(), notification.getCause());
292         }
293     }
294
295     private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
296         removeState(state.getIdentifier());
297         state.getOnFailureCallback().accept(failure);
298     }
299
300     @VisibleForTesting
301     boolean hasState(Identifier forIdentifier) {
302         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
303         stateCache.cleanUp();
304         return exists;
305     }
306
307     public static class Builder {
308         private FileBackedOutputStreamFactory fileBackedStreamFactory;
309         private int messageSliceSize = -1;
310         private long expireStateAfterInactivityDuration = -1;
311         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
312         private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
313         private String logContext = "<no-context>";
314
315         /**
316          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
317          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
318          * If Serializable messages aren't passed then the factory need not be set.
319          *
320          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
321          * @return this Builder
322          */
323         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
324             this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
325             return this;
326         }
327
328         /**
329          * Sets the maximum size (in bytes) for a message slice.
330          *
331          * @param newMessageSliceSize the maximum size (in bytes)
332          * @return this Builder
333          */
334         public Builder messageSliceSize(final int newMessageSliceSize) {
335             Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
336             this.messageSliceSize = newMessageSliceSize;
337             return this;
338         }
339
340         /**
341          * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
342          * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
343          *
344          * @param newMaxSlicingTries the maximum number of tries
345          * @return this Builder
346          */
347         public Builder maxSlicingTries(final int newMaxSlicingTries) {
348             Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
349             this.maxSlicingTries = newMaxSlicingTries;
350             return this;
351         }
352
353         /**
354          * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
355          * failure callback is notified due to inactivity from the assembling component on the other end. By default,
356          * state is not purged due to inactivity.
357          *
358          * @param duration the length of time after which a state entry is purged
359          * @param unit the unit the duration is expressed in
360          * @return this Builder
361          */
362         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
363             Preconditions.checkArgument(duration > 0, "duration must be > 0");
364             this.expireStateAfterInactivityDuration = duration;
365             this.expireStateAfterInactivityUnit = unit;
366             return this;
367         }
368
369         /**
370          * Sets the context for log messages.
371          *
372          * @param newLogContext the log context
373          * @return this Builder
374          */
375         public Builder logContext(final String newLogContext) {
376             this.logContext = Preconditions.checkNotNull(newLogContext);
377             return this;
378         }
379
380         /**
381          * Builds a new MessageSlicer instance.
382          *
383          * @return a new MessageSlicer
384          */
385         public MessageSlicer build() {
386             return new MessageSlicer(this);
387         }
388     }
389 }