Merge "Changed tests to leverage a dynamic port for testing COPS message marshalling...
[packetcable.git] / packetcable-driver / src / main / java / org / umu / cops / ospdp / COPSPdpOSConnection.java
1 package org.umu.cops.ospdp;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.umu.cops.stack.*;
6 import org.umu.cops.stack.COPSHeader.OPCode;
7
8 import java.io.IOException;
9 import java.net.Socket;
10 import java.util.Date;
11 import java.util.Map;
12 import java.util.concurrent.ConcurrentHashMap;
13
14 /**
15  * Class for managing an outsourcing connection at the PDP side.
16  */
17 public class COPSPdpOSConnection implements Runnable {
18
19     public final static Logger logger = LoggerFactory.getLogger(COPSPdpOSConnection.class);
20
21     /**
22         Socket connected to PEP
23      */
24     private Socket _sock;
25
26     /**
27         PEP identifier
28     */
29     private COPSPepId _pepId;
30
31     /**
32         Time of the latest keep-alive sent
33      */
34     private Date _lastKa;
35
36     /**
37      *  Time of the latest keep-alive received
38      */
39     protected Date _lastRecKa;
40
41     /**
42         Maps a Client Handle to a Handler
43      */
44     protected final Map<String, COPSPdpOSReqStateMan> _managerMap;
45
46     /**
47      *  PDP policy data processor class
48      */
49     protected COPSPdpOSDataProcess _process;
50
51     /**
52         Accounting timer value (secs)
53      */
54     protected short _acctTimer;
55
56     /**
57         Keep-alive timer value (secs)
58      */
59     protected short _kaTimer;
60
61     /**
62         COPS error returned by PEP
63      */
64     protected COPSError _error;
65
66     /**
67      * Creates a new PDP connection
68      *
69      * @param pepId PEP-ID of the connected PEP
70      * @param sock  Socket connected to PEP
71      * @param process   Object for processing policy data
72      */
73     public COPSPdpOSConnection(COPSPepId pepId, Socket sock, COPSPdpOSDataProcess process) {
74         _sock = sock;
75         _pepId = pepId;
76
77         _lastKa = new Date();
78         _managerMap = new ConcurrentHashMap<>();
79
80         _kaTimer = 0;
81         _process = process;
82     }
83
84     /**
85      * Gets the time of that latest keep-alive sent
86      * @return Time of that latest keep-alive sent
87      */
88     public Date getLastKAlive() {
89         return _lastKa;
90     }
91
92     /**
93      * Sets the keep-alive timer value
94      * @param kaTimer Keep-alive timer value (secs)
95      */
96     public void setKaTimer(short kaTimer) {
97         _kaTimer = kaTimer;
98     }
99
100     /**
101      * Gets the keep-alive timer value
102      * @return Keep-alive timer value (secs)
103      */
104     public short getKaTimer() {
105         return _kaTimer;
106     }
107
108     /**
109      * Sets the accounting timer value
110      * @param acctTimer Accounting timer value (secs)
111      */
112     public void setAccTimer(short acctTimer) {
113         _acctTimer = acctTimer;
114     }
115
116     /**
117      * Gets the accounting timer value
118      * @return Accounting timer value (secs)
119      */
120     public short getAcctTimer() {
121         return _acctTimer;
122     }
123
124     /**
125      * Gets the PEP-ID
126      * @return   The ID of the PEP, as a <tt>String</tt>
127      */
128     public String getPepId() {
129         return _pepId.getData().str();
130     }
131
132     /**
133      * Checks whether the socket to the PEP is closed or not
134      * @return   <tt>true</tt> if closed, <tt>false</tt> otherwise
135      */
136     public boolean isClosed() {
137         return _sock.isClosed();
138     }
139
140     /**
141      * Closes the socket to the PEP
142      * @throws IOException
143      */
144     protected void close()
145     throws IOException {
146         _sock.close();
147     }
148
149     /**
150      * Gets the socket to the PEP
151      * @return   Socket connected to the PEP
152      */
153     public Socket getSocket() {
154         return _sock;
155     }
156
157     /**
158      * Main loop
159      */
160     public void run() {
161         Date _lastSendKa = new Date();
162         _lastRecKa = new Date();
163         try {
164             while (!_sock.isClosed()) {
165                 if (_sock.getInputStream().available() != 0) {
166 //                    _lastmessage = processMessage(_sock);
167                     processMessage(_sock);
168                     _lastRecKa = new Date();
169                 }
170
171                 // Keep Alive
172                 if (_kaTimer > 0) {
173                     // Timeout at PDP
174                     int _startTime = (int) (_lastRecKa.getTime());
175                     int cTime = (int) (new Date().getTime());
176
177                     if ((cTime - _startTime) > _kaTimer*1000) {
178                         _sock.close();
179                         // Notify all Request State Managers
180                         notifyNoKAAllReqStateMan();
181                     }
182
183                     // Send to PEP
184                     _startTime = (int) (_lastSendKa.getTime());
185                     cTime = (int) (new Date().getTime());
186
187                     if ((cTime - _startTime) > ((_kaTimer*3/4)*1000)) {
188                         // TODO - is 0 ok for a clientType here???
189                         final COPSKAMsg msg = new COPSKAMsg(null);
190                         COPSTransceiver.sendMsg(msg, _sock);
191                         _lastSendKa = new Date();
192                     }
193                 }
194
195                 try {
196                     Thread.sleep(500);
197                 } catch (Exception e) {
198                     logger.error("Exception caught while sleeping", e);
199                 }
200
201             }
202         } catch (Exception e) {
203             logger.error("Error processing COPS message from socket", e);
204         }
205
206         // connection closed by server
207         // COPSDebug.out(getClass().getName(),"Connection closed by client");
208         try {
209             _sock.close();
210         } catch (IOException e) {
211             logger.error("Error closing socket", e);
212         }
213
214         // Notify all Request State Managers
215         try {
216             notifyCloseAllReqStateMan();
217         } catch (COPSPdpException e) {
218             logger.error("Error closing state managers", e);
219         }
220     }
221
222     /**
223      * Gets a COPS message from the socket and processes it
224      * @param    conn Socket connected to the PEP
225      */
226     private void processMessage(Socket conn) throws COPSPdpException, COPSException, IOException {
227         COPSMsg msg = COPSTransceiver.receiveMsg(conn);
228
229         if (msg.getHeader().getOpCode().equals(OPCode.CC)) {
230             handleClientCloseMsg(conn, msg);
231         } else if (msg.getHeader().getOpCode().equals(OPCode.KA)) {
232             handleKeepAliveMsg(conn, msg);
233         } else if (msg.getHeader().getOpCode().equals(OPCode.REQ)) {
234             handleRequestMsg(conn, msg);
235         } else if (msg.getHeader().getOpCode().equals(OPCode.RPT)) {
236             handleReportMsg(conn, msg);
237         } else if (msg.getHeader().getOpCode().equals(OPCode.DRQ)) {
238             handleDeleteRequestMsg(conn, msg);
239         } else if (msg.getHeader().getOpCode().equals(OPCode.SSC)) {
240             handleSyncComplete(conn, msg);
241         } else {
242             throw new COPSPdpException("Message not expected (" + msg.getHeader().getOpCode() + ").");
243         }
244     }
245
246     /**
247      * Handle Client Close Message, close the passed connection
248      *
249      * @param    conn                a  Socket
250      * @param    msg                 a  COPSMsg
251      *
252      *
253      * <Client-Close> ::= <Common Header>
254      *                      <Error>
255      *                      [<Integrity>]
256      *
257      * Not support [<Integrity>]
258      *
259      */
260     private void handleClientCloseMsg(Socket conn, COPSMsg msg) {
261         COPSClientCloseMsg cMsg = (COPSClientCloseMsg) msg;
262         _error = cMsg.getError();
263
264         // COPSDebug.out(getClass().getName(),"Got close request, closing connection " +
265         //  conn.getInetAddress() + ":" + conn.getPort() + ":[Error " + _error.getDescription() + "]");
266
267         try {
268             // Support
269             if (cMsg.getIntegrity() != null) {
270                 logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
271             }
272
273             conn.close();
274         } catch (Exception unae) {
275             logger.error("Unexpected exception while closing the connection", unae);
276         }
277     }
278
279     /**
280      * Gets the occurred COPS Error
281      * @return   <tt>COPSError</tt> object
282      */
283     protected COPSError getError()  {
284         return _error;
285     }
286
287     /**
288      * Handle Keep Alive Message
289      *
290      * <Keep-Alive> ::= <Common Header>
291      *                  [<Integrity>]
292      *
293      * Not support [<Integrity>]
294      *
295      * @param    conn                a  Socket
296      * @param    msg                 a  COPSMsg
297      *
298      */
299     private void handleKeepAliveMsg(Socket conn, COPSMsg msg) {
300         COPSKAMsg cMsg = (COPSKAMsg) msg;
301
302         COPSKAMsg kaMsg = (COPSKAMsg) msg;
303         try {
304             // Support
305             if (cMsg.getIntegrity() != null) {
306                 logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
307             }
308
309             kaMsg.writeData(conn);
310         } catch (Exception unae) {
311             logger.error("Unexpected exception writing COPS data", unae);
312         }
313     }
314
315     /**
316      * Handle Delete Request Message
317      *
318      * <Delete Request> ::= <Common Header>
319      *                      <Client Handle>
320      *                      <Reason>
321      *                      [<Integrity>]
322      *
323      * Not support [<Integrity>]
324      *
325      * @param    conn                a  Socket
326      * @param    msg                 a  COPSMsg
327      *
328      */
329     private void handleDeleteRequestMsg(Socket conn, COPSMsg msg)
330     throws COPSPdpException {
331         COPSDeleteMsg cMsg = (COPSDeleteMsg) msg;
332         // COPSDebug.out(getClass().getName(),"Removing ClientHandle for " +
333         //  conn.getInetAddress() + ":" + conn.getPort() + ":[Reason " + cMsg.getReason().getDescription() + "]");
334
335         // Support
336         if (cMsg.getIntegrity() != null) {
337             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
338         }
339
340         // Delete clientHandler
341         if (_managerMap.remove(cMsg.getClientHandle().getId().str()) == null) {
342             // COPSDebug.out(getClass().getName(),"Missing for ClientHandle " +
343             //  cMsg.getClientHandle().getId().getData());
344         }
345
346         final COPSPdpOSReqStateMan man = _managerMap.get(cMsg.getClientHandle().getId().str());
347         if (man == null) {
348             logger.warn("State manager not found for ID - " + cMsg.getClientHandle().getId().str());
349         } else {
350             man.processDeleteRequestState(cMsg);
351         }
352
353     }
354
355     /**
356      * Handle Request Message
357      *
358      * <Request> ::= <Common Header>
359      *                  <Client Handle>
360      *                  <Context>
361      *                  *(<Named ClientSI>)
362      *                  [<Integrity>]
363      * <Named ClientSI> ::= <*(<PRID> <EPD>)>
364      *
365      * Not support [<Integrity>]
366      *
367      * @param    conn                a  Socket
368      * @param    msg                 a  COPSMsg
369      *
370      */
371     private void handleRequestMsg(Socket conn, COPSMsg msg) throws COPSPdpException {
372         final COPSReqMsg reqMsg = (COPSReqMsg) msg;
373         final COPSHeader header = reqMsg.getHeader();
374         final short cType = header.getClientType();
375
376         // Support
377         if (reqMsg.getIntegrity() != null) {
378             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
379         }
380
381         final COPSPdpOSReqStateMan man;
382         if (_managerMap.get(reqMsg.getClientHandle().getId().str()) == null) {
383             man = new COPSPdpOSReqStateMan(cType, reqMsg.getClientHandle().getId().str());
384             _managerMap.put(reqMsg.getClientHandle().getId().str(),man);
385             man.setDataProcess(_process);
386             man.initRequestState(_sock);
387         } else {
388             man = _managerMap.get(reqMsg.getClientHandle().getId().str());
389         }
390         man.processRequest(reqMsg);
391     }
392
393     /**
394      * Handle Report Message
395      *
396      * <Report State> ::= <Common Header>
397      *                      <Client Handle>
398      *                      <Report Type>
399      *                      *(<Named ClientSI>)
400      *                      [<Integrity>]
401      *
402      * Not support [<Integrity>]
403      *
404      * @param    conn                a  Socket
405      * @param    msg                 a  COPSMsg
406      *
407      */
408     private void handleReportMsg(Socket conn, COPSMsg msg) throws COPSPdpException {
409         COPSReportMsg repMsg = (COPSReportMsg) msg;
410         // COPSHandle handle = repMsg.getClientHandle();
411         // COPSHeader header = repMsg.getHeader();
412
413         // Support
414         if (repMsg.getIntegrity() != null) {
415             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
416         }
417
418         COPSPdpOSReqStateMan man = _managerMap.get(repMsg.getClientHandle().getId().str());
419         if (man == null) {
420             logger.warn("State manager not found for ID - " + repMsg.getClientHandle().getId().str());
421         } else {
422             man.processReport(repMsg);
423         }
424     }
425
426     /**
427      * Method handleSyncComplete
428      *
429      * @param    conn                a  Socket
430      * @param    msg                 a  COPSMsg
431      *
432      */
433     private void handleSyncComplete(Socket conn, COPSMsg msg) throws COPSPdpException {
434         final COPSSyncStateMsg cMsg = (COPSSyncStateMsg) msg;
435
436         // Support
437         if (cMsg.getIntegrity() != null) {
438             logger.error("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
439         }
440
441         final COPSPdpOSReqStateMan man = _managerMap.get(cMsg.getClientHandle().getId().str());
442         if (man == null) {
443             logger.warn("State manager not found for ID - " + cMsg.getClientHandle().getId().str());
444         } else {
445             man.processSyncComplete(cMsg);
446         }
447     }
448
449     /**
450      * Requests a COPS sync from the PEP
451      * @throws COPSException
452      * @throws COPSPdpException
453      */
454     protected void syncAllRequestState() throws COPSException, COPSPdpException {
455         for (final COPSPdpOSReqStateMan man : _managerMap.values()) {
456             man.syncRequestState();
457         }
458     }
459
460     private void notifyCloseAllReqStateMan() throws COPSPdpException {
461         for (final COPSPdpOSReqStateMan man : _managerMap.values()) {
462             man.processClosedConnection(_error);
463         }
464     }
465
466     private void notifyNoKAAllReqStateMan() throws COPSPdpException {
467         for (final COPSPdpOSReqStateMan man : _managerMap.values()) {
468             man.processNoKAConnection();
469         }
470     }
471
472 }