Merge changes If0630105,I9d2d5e61,I1cea2a32,Icc05b6a7,Ic57eb4f8, ...
[packetcable.git] / packetcable-driver / src / main / java / org / pcmm / PCMMPdpConnection.java
1 /*
2  @header@
3  */
4
5 package org.pcmm;
6
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.umu.cops.prpdp.COPSPdpException;
10 import org.umu.cops.stack.*;
11
12 import javax.annotation.concurrent.ThreadSafe;
13 import java.io.IOException;
14 import java.net.Socket;
15 import java.util.Date;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18
19 /**
20  * Class for managing an provisioning connection at the PDP side for receiving and brokering out COPS messages.
21  */
22 @ThreadSafe
23 public class PCMMPdpConnection implements Runnable {
24
25     private final static Logger logger = LoggerFactory.getLogger(PCMMPdpConnection.class);
26
27     /**
28     Socket connected to PEP
29      */
30     private final Socket _sock;
31
32     /**
33      * PEP identifier
34      * TODO - Determine why the original author put this object into this class
35      */
36     private final COPSPepId _pepId;
37
38     /**
39      * Time of the latest keep-alive received
40      */
41     protected Date _lastRecKa;
42
43     /**
44      * Maps a Client Handle to a Handler
45      */
46     protected final Map<String, PCMMPdpReqStateMan> _managerMap;
47
48     /**
49      *  PDP policy data processor class
50      */
51     protected final PCMMPdpDataProcess _process;
52
53     /**
54      * Accounting timer value (secs)
55      */
56     protected final short _acctTimer;
57
58     /**
59      * Keep-alive timer value (secs)
60      */
61     protected final short _kaTimer;
62
63     /**
64      * COPS error returned by PEP on close
65      */
66     protected transient COPSError _error;
67
68     /**
69      * Creates a new PDP connection
70      *
71      * @param pepId PEP-ID of the connected PEP
72      * @param sock Socket connected to PEP
73      * @param process Object for processing policy data
74      */
75     public PCMMPdpConnection(final COPSPepId pepId, final Socket sock, final PCMMPdpDataProcess process,
76                              final short kaTimer, final short acctTimer) {
77         _pepId = pepId;
78         _sock = sock;
79         _process = process;
80         _kaTimer = kaTimer;
81         _acctTimer = acctTimer;
82         _managerMap = new ConcurrentHashMap<>();
83     }
84
85     public void addStateMan(final String key, final PCMMPdpReqStateMan man) {
86         _managerMap.put(key, man);
87     }
88
89     /**
90      * Checks whether the socket to the PEP is closed or not
91      * @return   <tt>true</tt> if closed, <tt>false</tt> otherwise
92      */
93     public boolean isClosed() {
94         return _sock.isClosed();
95     }
96
97     /**
98      * Closes the socket to the PEP
99      * @throws IOException
100      */
101     protected void close() throws IOException {
102         if (!_sock.isClosed()) _sock.close();
103     }
104
105     /**
106      * Gets the socket to the PEP
107      * @return   Socket connected to the PEP
108      */
109     public Socket getSocket() {
110         return _sock;
111     }
112
113     /**
114      * Main loop
115      */
116     public void run () {
117         logger.info("Starting socket listener.");
118         Date _lastSendKa = new Date();
119         _lastRecKa = new Date();
120
121         // Loop while socket is open
122         while (!_sock.isClosed()) {
123             try {
124                 if (_sock.getInputStream().available() != 0) {
125                     logger.info("Waiting to process socket messages");
126                     processMessage(_sock);
127                     logger.info("Message processed");
128                     _lastRecKa = new Date();
129                 }
130
131                 // Keep Alive
132                 if (_kaTimer > 0) {
133                     // Timeout at PDP
134                     int _startTime = (int) (_lastRecKa.getTime());
135                     int cTime = (int) (new Date().getTime());
136
137                     if ((cTime - _startTime) > _kaTimer*1000) {
138                         _sock.close();
139                         // Notify all Request State Managers
140                         notifyNoKAAllReqStateMan();
141                     }
142
143                     // Send to PEP
144                     _startTime = (int) (_lastSendKa.getTime());
145                     cTime = (int) (new Date().getTime());
146
147                     if ((cTime - _startTime) > ((_kaTimer*3/4)*1000)) {
148                         final COPSKAMsg msg = new COPSKAMsg(null);
149                         logger.info("Sending KA message to CCAP");
150                         COPSTransceiver.sendMsg(msg, _sock);
151                         logger.info("Sent KA message gto CCAP");
152                         _lastSendKa = new Date();
153                     }
154                 }
155
156                 try {
157                     Thread.sleep(500);
158                 } catch (InterruptedException e) {
159                     logger.info("Shutting down socket connection to CCAP");
160                     break;
161                 }
162             } catch (IOException e) {
163                 logger.error("Exception reading from socket - exiting", e);
164                 break;
165             } catch (COPSException e) {
166                 logger.error("Exception processing message - continue processing", e);
167             }
168         }
169
170         try {
171             if (! _sock.isClosed())
172                 _sock.close();
173         } catch (IOException e) {
174             logger.error("Error closing socket", e);
175         }
176
177         // Notify all Request State Managers
178         try {
179             notifyCloseAllReqStateMan();
180         } catch (COPSPdpException e) {
181             logger.error("Error closing state managers", e);
182         }
183     }
184
185     /**
186      * Gets a COPS message from the socket and processes it
187      * @param    conn Socket connected to the PEP
188      */
189     private void processMessage(final Socket conn) throws COPSException, IOException {
190         final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
191
192         logger.info("Processing message received of type - " + msg.getHeader().getOpCode());
193
194         switch (msg.getHeader().getOpCode()) {
195             case CC:
196                 handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
197                 break;
198             case KA:
199                 handleKeepAliveMsg(conn, (COPSKAMsg)msg);
200                 break;
201             case REQ:
202                 handleRequestMsg(conn, (COPSReqMsg)msg);
203                 break;
204             case RPT:
205                 handleReportMsg(conn, (COPSReportMsg)msg);
206                 break;
207             case DRQ:
208                 handleDeleteRequestMsg(conn, (COPSDeleteMsg)msg);
209                 break;
210             case SSQ:
211                 handleSyncComplete(conn, (COPSSyncStateMsg)msg);
212                 break;
213             default:
214                 throw new COPSPdpException("Message not expected (" + msg.getHeader().getOpCode() + ").");
215         }
216     }
217
218     /**
219      * Handle Client Close Message, close the passed connection
220      * @param    conn                a  Socket
221      * @param    cMsg                a  COPSClientCloseMsg
222      */
223     private void handleClientCloseMsg(final Socket conn, final COPSClientCloseMsg cMsg) {
224         _error = cMsg.getError();
225         logger.info("Closing client with error - " + _error.getDescription());
226         try {
227             // Support
228             if (cMsg.getIntegrity() != null) {
229                 logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
230             }
231
232             conn.close();
233         } catch (Exception unae) {
234             logger.error("Unexpected exception closing connection", unae);
235         }
236     }
237
238     /**
239      * Handle Keep Alive Message
240      * @param    conn                a  Socket
241      * @param    kaMsg               a  COPSKAMsg
242      */
243     private void handleKeepAliveMsg(final Socket conn, final COPSKAMsg kaMsg) {
244         try {
245             // Support
246             if (kaMsg.getIntegrity() != null) {
247                 logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
248             }
249             kaMsg.writeData(conn);
250         } catch (Exception unae) {
251             logger.error("Unexpected exception while writing keep-alive message", unae);
252         }
253     }
254
255     /**
256      * Handle Delete Request Message
257      * @param    conn                a  Socket
258      * @param    cMsg                a  COPSDeleteMsg
259      */
260     private void handleDeleteRequestMsg(final Socket conn, final COPSDeleteMsg cMsg) throws COPSPdpException {
261         // Support
262         if (cMsg.getIntegrity() != null) {
263             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
264         }
265
266         // Delete clientHandler
267         final PCMMPdpReqStateMan man = _managerMap.remove(cMsg.getClientHandle().getId().str());
268         if (man == null) {
269             logger.warn("Cannot delete request state, no state manger found");
270         } else {
271             man.processDeleteRequestState(cMsg);
272         }
273     }
274
275     /**
276      * Handle Request Message
277      * @param    conn                a  Socket
278      * @param    reqMsg              a  COPSReqMsg
279      */
280     private void handleRequestMsg(final Socket conn, final COPSReqMsg reqMsg) throws COPSException {
281         final COPSHeader header = reqMsg.getHeader();
282         final short cType = header.getClientType();
283
284         // Support
285         if (reqMsg.getIntegrity() != null) {
286             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
287         }
288
289         final PCMMPdpReqStateMan man;
290         if (_managerMap.get(reqMsg.getClientHandle().getId().str()) == null) {
291
292             man = new PCMMPdpReqStateMan(cType, reqMsg.getClientHandle(), _process);
293             _managerMap.put(reqMsg.getClientHandle().getId().str(), man);
294             man.initRequestState(_sock);
295             logger.info("Created state manager for ID - " + reqMsg.getClientHandle().getId().str());
296         } else {
297             man = _managerMap.get(reqMsg.getClientHandle().getId().str());
298         }
299
300         man.processRequest(reqMsg);
301     }
302
303     /**
304      * Handle Report Message
305      * @param    conn                a  Socket
306      * @param    repMsg              a  COPSReportMsg
307      */
308     private void handleReportMsg(final Socket conn, final COPSReportMsg repMsg) throws COPSPdpException {
309         // Support
310         if (repMsg.getIntegrity() != null) {
311             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
312         }
313
314         final PCMMPdpReqStateMan man = _managerMap.get(repMsg.getClientHandle().getId().str());
315         if (man == null) {
316             logger.warn("State manager not found");
317         } else {
318             man.processReport(repMsg);
319         }
320     }
321
322     /**
323      * Method handleSyncComplete
324      * @param    conn                a  Socket
325      * @param    cMsg                a  COPSSyncStateMsg
326      */
327     private void handleSyncComplete(final Socket conn, final COPSSyncStateMsg cMsg) throws COPSException {
328         // Support
329         if (cMsg.getIntegrity() != null) {
330             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
331         }
332
333         final PCMMPdpReqStateMan man = _managerMap.get(cMsg.getClientHandle().getId().str());
334         if (man == null) {
335             logger.warn("State manager not found");
336         } else {
337             man.processSyncComplete(cMsg);
338         }
339     }
340
341     /**
342      * Requests a COPS sync from the PEP
343      * @throws COPSException
344      * @throws COPSPdpException
345      */
346     protected void syncAllRequestState() throws COPSException {
347         for (final PCMMPdpReqStateMan man : _managerMap.values()) {
348             man.syncRequestState();
349         }
350     }
351
352     private void notifyCloseAllReqStateMan() throws COPSPdpException {
353         for (final PCMMPdpReqStateMan man : _managerMap.values()) {
354             man.processClosedConnection(_error);
355         }
356     }
357
358     private void notifyNoKAAllReqStateMan() throws COPSPdpException {
359         for (final PCMMPdpReqStateMan man : _managerMap.values()) {
360             man.processNoKAConnection();
361         }
362     }
363
364 }