Simplify code using Java 8 features
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageAssembler.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalNotification;
16 import com.google.common.io.ByteSource;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.function.BiConsumer;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
24 import org.opendaylight.yangtools.concepts.Identifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}.
30  *
31  * @author Thomas Pantelis
32  * @see MessageSlicer
33  */
34 public final  class MessageAssembler implements AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
36
37     private final Cache<Identifier, AssembledMessageState> stateCache;
38     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
39     private final BiConsumer<Object, ActorRef> assembledMessageCallback;
40     private final String logContext;
41
42     MessageAssembler(final Builder builder) {
43         this.fileBackedStreamFactory = Preconditions.checkNotNull(builder.fileBackedStreamFactory,
44                 "FiledBackedStreamFactory cannot be null");
45         this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
46                 "assembledMessageCallback cannot be null");
47         this.logContext = builder.logContext;
48
49         stateCache = CacheBuilder.newBuilder()
50                 .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
51                 .removalListener(this::stateRemoved).build();
52     }
53
54     /**
55      * Returns a new Builder for creating MessageAssembler instances.
56      *
57      * @return a Builder instance
58      */
59     public static Builder builder() {
60         return new Builder();
61     }
62
63     /**
64      * Checks if the given message is handled by this class. If so, it should be forwarded to the
65      * {@link #handleMessage(Object, ActorRef)} method
66      *
67      * @param message the message to check
68      * @return true if handled, false otherwise
69      */
70     public static boolean isHandledMessage(final Object message) {
71         return message instanceof MessageSlice || message instanceof AbortSlicing;
72     }
73
74     @Override
75     public void close() {
76         LOG.debug("{}: Closing", logContext);
77         stateCache.invalidateAll();
78     }
79
80     /**
81      * Checks for and removes assembled message state that has expired due to inactivity from the slicing component
82      * on the other end.
83      */
84     public void checkExpiredAssembledMessageState() {
85         if (stateCache.size() > 0) {
86             stateCache.cleanUp();
87         }
88     }
89
90     /**
91      * Invoked to handle message slices and other messages pertaining to this class.
92      *
93      * @param message the message
94      * @param sendTo the reference of the actor to which subsequent message slices should be sent
95      * @return true if the message was handled, false otherwise
96      */
97     public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) {
98         if (message instanceof MessageSlice) {
99             LOG.debug("{}: handleMessage: {}", logContext, message);
100             onMessageSlice((MessageSlice) message, sendTo);
101             return true;
102         } else if (message instanceof AbortSlicing) {
103             LOG.debug("{}: handleMessage: {}", logContext, message);
104             onAbortSlicing((AbortSlicing) message);
105             return true;
106         }
107
108         return false;
109     }
110
111     private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) {
112         final Identifier identifier = messageSlice.getIdentifier();
113         try {
114             final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice));
115             processMessageSliceForState(messageSlice, state, sendTo);
116         } catch (ExecutionException e) {
117             final MessageSliceException messageSliceEx;
118             final Throwable cause = e.getCause();
119             if (cause instanceof MessageSliceException) {
120                 messageSliceEx = (MessageSliceException) cause;
121             } else {
122                 messageSliceEx = new MessageSliceException(String.format(
123                         "Error creating state for identifier %s", identifier), cause);
124             }
125
126             messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo),
127                     ActorRef.noSender());
128         }
129     }
130
131     private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException {
132         final Identifier identifier = messageSlice.getIdentifier();
133         if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
134             LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
135             return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
136                     fileBackedStreamFactory, logContext);
137         }
138
139         LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
140         throw new MessageSliceException(String.format(
141                 "No assembled state found for identifier %s and slice index %s", identifier,
142                 messageSlice.getSliceIndex()), true);
143     }
144
145     private void processMessageSliceForState(final MessageSlice messageSlice, final AssembledMessageState state,
146             final ActorRef sendTo) {
147         final Identifier identifier = messageSlice.getIdentifier();
148         final ActorRef replyTo = messageSlice.getReplyTo();
149         Object reAssembledMessage = null;
150         synchronized (state) {
151             final int sliceIndex = messageSlice.getSliceIndex();
152             try {
153                 final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo);
154                 if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
155                     LOG.debug("{}: Received last slice for {}", logContext, identifier);
156
157                     reAssembledMessage = reAssembleMessage(state);
158
159                     replyTo.tell(successReply, ActorRef.noSender());
160                     removeState(identifier);
161                 } else {
162                     LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier);
163                     replyTo.tell(successReply, ActorRef.noSender());
164                 }
165             } catch (MessageSliceException e) {
166                 LOG.warn("{}: Error processing {}", logContext, messageSlice, e);
167                 replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender());
168                 removeState(identifier);
169             }
170         }
171
172         if (reAssembledMessage != null) {
173             LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage);
174             assembledMessageCallback.accept(reAssembledMessage, replyTo);
175         }
176     }
177
178     private static Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
179         try {
180             final ByteSource assembledBytes = state.getAssembledBytes();
181             try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
182                 return in.readObject();
183             }
184
185         } catch (IOException | ClassNotFoundException  e) {
186             throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s",
187                     state.getIdentifier()), e);
188         }
189     }
190
191     private void onAbortSlicing(final AbortSlicing message) {
192         removeState(message.getIdentifier());
193     }
194
195     private void removeState(final Identifier identifier) {
196         LOG.debug("{}: Removing state for {}", logContext, identifier);
197         stateCache.invalidate(identifier);
198     }
199
200     private void stateRemoved(final RemovalNotification<Identifier, AssembledMessageState> notification) {
201         if (notification.wasEvicted()) {
202             LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
203         } else {
204             LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext,
205                     notification.getKey(), notification.getCause());
206         }
207
208         notification.getValue().close();
209     }
210
211     @VisibleForTesting
212     boolean hasState(final Identifier forIdentifier) {
213         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
214         stateCache.cleanUp();
215         return exists;
216     }
217
218     public static class Builder {
219         private FileBackedOutputStreamFactory fileBackedStreamFactory;
220         private BiConsumer<Object, ActorRef> assembledMessageCallback;
221         private long expireStateAfterInactivityDuration = 1;
222         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
223         private String logContext = "<no-context>";
224
225         /**
226          * Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
227          *
228          * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
229          * @return this Builder
230          */
231         public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
232             this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory);
233             return this;
234         }
235
236         /**
237          * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the
238          * original sender ActorRef as arguments.
239          *
240          * @param newAssembledMessageCallback the Consumer callback
241          * @return this Builder
242          */
243         public Builder assembledMessageCallback(final BiConsumer<Object, ActorRef> newAssembledMessageCallback) {
244             this.assembledMessageCallback = newAssembledMessageCallback;
245             return this;
246         }
247
248         /**
249          * Sets the duration and time unit whereby assembled message state is purged from the cache due to
250          * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of
251          * inactivity.
252          *
253          * @param duration the length of time after which a state entry is purged
254          * @param unit the unit the duration is expressed in
255          * @return this Builder
256          */
257         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
258             Preconditions.checkArgument(duration > 0, "duration must be > 0");
259             this.expireStateAfterInactivityDuration = duration;
260             this.expireStateAfterInactivityUnit = unit;
261             return this;
262         }
263
264         /**
265          * Sets the context for log messages.
266          *
267          * @param newLogContext the log context
268          * @return this Builder
269          */
270         public Builder logContext(final String newLogContext) {
271             this.logContext = newLogContext;
272             return this;
273         }
274
275         /**
276          * Builds a new MessageAssembler instance.
277          *
278          * @return a new MessageAssembler
279          */
280         public MessageAssembler build() {
281             return new MessageAssembler(this);
282         }
283     }
284 }