xrootd
Loading...
Searching...
No Matches
XrdClStream.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
21
22#include "XrdCl/XrdClPoller.hh"
23#include "XrdCl/XrdClStatus.hh"
24#include "XrdCl/XrdClURL.hh"
28#include "XrdCl/XrdClInQueue.hh"
29#include "XrdCl/XrdClUtils.hh"
30
33#include "XrdNet/XrdNetAddr.hh"
34#include <list>
35#include <vector>
36#include <functional>
37#include <memory>
38
39namespace XrdCl
40{
41 class Message;
42 class Channel;
43 class TransportHandler;
44 class TaskManager;
45 struct SubStreamData;
46
47 //----------------------------------------------------------------------------
49 //----------------------------------------------------------------------------
50 class Stream
51 {
52 public:
53 //------------------------------------------------------------------------
55 //------------------------------------------------------------------------
57 {
61 Error = 3
62 };
63
64 //------------------------------------------------------------------------
66 //------------------------------------------------------------------------
67 Stream( const URL *url, const URL &prefer = URL() );
68
69 //------------------------------------------------------------------------
71 //------------------------------------------------------------------------
73
74 //------------------------------------------------------------------------
76 //------------------------------------------------------------------------
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 MsgHandler *handler,
84 bool stateful,
85 time_t expires );
86
87 //------------------------------------------------------------------------
89 //------------------------------------------------------------------------
90 void SetTransport( TransportHandler *transport )
91 {
92 pTransport = transport;
93 }
94
95 //------------------------------------------------------------------------
97 //------------------------------------------------------------------------
98 void SetPoller( Poller *poller )
99 {
100 pPoller = poller;
101 }
102
103 //------------------------------------------------------------------------
105 //------------------------------------------------------------------------
106 void SetIncomingQueue( InQueue *incomingQueue )
107 {
108 pIncomingQueue = incomingQueue;
109 }
110
111 //------------------------------------------------------------------------
113 //------------------------------------------------------------------------
114 void SetChannelData( AnyObject *channelData )
115 {
116 pChannelData = channelData;
117 }
118
119 //------------------------------------------------------------------------
121 //------------------------------------------------------------------------
122 void SetTaskManager( TaskManager *taskManager )
123 {
124 pTaskManager = taskManager;
125 }
126
127 //------------------------------------------------------------------------
129 //------------------------------------------------------------------------
130 void SetJobManager( JobManager *jobManager )
131 {
132 pJobManager = jobManager;
133 }
134
135 //------------------------------------------------------------------------
139 //------------------------------------------------------------------------
141
142 //------------------------------------------------------------------------
144 //------------------------------------------------------------------------
145 void Disconnect( bool force = false );
146
147 //------------------------------------------------------------------------
150 //------------------------------------------------------------------------
151 void Tick( time_t now );
152
153 //------------------------------------------------------------------------
155 //------------------------------------------------------------------------
156 const URL *GetURL() const
157 {
158 return pUrl;
159 }
160
161 //------------------------------------------------------------------------
163 //------------------------------------------------------------------------
165
166 //------------------------------------------------------------------------
168 //------------------------------------------------------------------------
169 const std::string &GetName() const
170 {
171 return pStreamName;
172 }
173
174 //------------------------------------------------------------------------
176 //------------------------------------------------------------------------
177 void DisableIfEmpty( uint16_t subStream );
178
179 //------------------------------------------------------------------------
181 //------------------------------------------------------------------------
182 void OnIncoming( uint16_t subStream,
183 std::shared_ptr<Message> msg,
184 uint32_t bytesReceived );
185
186 //------------------------------------------------------------------------
187 // Call when one of the sockets is ready to accept a new message
188 //------------------------------------------------------------------------
189 std::pair<Message *, MsgHandler *>
190 OnReadyToWrite( uint16_t subStream );
191
192 //------------------------------------------------------------------------
193 // Call when a message is written to the socket
194 //------------------------------------------------------------------------
195 void OnMessageSent( uint16_t subStream,
196 Message *msg,
197 uint32_t bytesSent );
198
199 //------------------------------------------------------------------------
201 //------------------------------------------------------------------------
202 void OnConnect( uint16_t subStream );
203
204 //------------------------------------------------------------------------
206 //------------------------------------------------------------------------
207 void OnConnectError( uint16_t subStream, XRootDStatus status );
208
209 //------------------------------------------------------------------------
211 //------------------------------------------------------------------------
212 void OnError( uint16_t subStream, XRootDStatus status );
213
214 //------------------------------------------------------------------------
216 //------------------------------------------------------------------------
217 void ForceError( XRootDStatus status );
218
219 //------------------------------------------------------------------------
221 //------------------------------------------------------------------------
222 void OnReadTimeout( uint16_t subStream );
223
224 //------------------------------------------------------------------------
226 //------------------------------------------------------------------------
227 void OnWriteTimeout( uint16_t subStream );
228
229 //------------------------------------------------------------------------
231 //------------------------------------------------------------------------
233
234 //------------------------------------------------------------------------
236 //------------------------------------------------------------------------
238
239 //------------------------------------------------------------------------
248 //------------------------------------------------------------------------
250 InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
251
252 //------------------------------------------------------------------------
256 //------------------------------------------------------------------------
257 uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
258
259 //------------------------------------------------------------------------
261 //------------------------------------------------------------------------
262 void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
263 {
264 XrdSysMutexHelper scopedLock( pMutex );
265 pOnDataConnJob = onConnJob;
266 }
267
268 //------------------------------------------------------------------------
271 //------------------------------------------------------------------------
272 bool CanCollapse( const URL &url );
273
274 //------------------------------------------------------------------------
276 //------------------------------------------------------------------------
277 Status Query( uint16_t query, AnyObject &result );
278
279 private:
280
281 //------------------------------------------------------------------------
283 //------------------------------------------------------------------------
284 static bool IsPartial( Message &msg );
285
286 //------------------------------------------------------------------------
288 //------------------------------------------------------------------------
289 inline static bool HasNetAddr( const XrdNetAddr &addr,
290 std::vector<XrdNetAddr> &addresses )
291 {
292 auto itr = addresses.begin();
293 for( ; itr != addresses.end() ; ++itr )
294 {
295 if( itr->Same( &addr ) ) return true;
296 }
297
298 return false;
299 }
300
301 //------------------------------------------------------------------------
302 // Job handling the incoming messages
303 //------------------------------------------------------------------------
304 class HandleIncMsgJob: public Job
305 {
306 public:
307 HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
308 virtual ~HandleIncMsgJob() {};
309 virtual void Run( void* )
310 {
311 pHandler->Process();
312 delete this;
313 }
314 private:
316 };
317
318 //------------------------------------------------------------------------
320 //------------------------------------------------------------------------
321 void OnFatalError( uint16_t subStream,
322 XRootDStatus status,
323 XrdSysMutexHelper &lock );
324
325 //------------------------------------------------------------------------
327 //------------------------------------------------------------------------
329
330 //------------------------------------------------------------------------
332 //------------------------------------------------------------------------
334
335 typedef std::vector<SubStreamData*> SubStreamList;
336
337 //------------------------------------------------------------------------
338 // Data members
339 //------------------------------------------------------------------------
340 const URL *pUrl;
342 std::string pStreamName;
358 std::vector<XrdNetAddr> pAddresses;
361 uint64_t pSessionId;
362
363 //------------------------------------------------------------------------
364 // Monitoring info
365 //------------------------------------------------------------------------
368 uint64_t pBytesSent;
370
371 //------------------------------------------------------------------------
372 // Data stream on-connect handler
373 //------------------------------------------------------------------------
374 std::shared_ptr<Job> pOnDataConnJob;
375
376 //------------------------------------------------------------------------
377 // Track last assigned Id across all Streams, to ensure unique sessionId
378 //------------------------------------------------------------------------
380 };
381}
382
383#endif // __XRD_CL_STREAM_HH__
Definition: XrdClAnyObject.hh:33
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:210
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:34
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:37
A synchronized queue.
Definition: XrdClJobManager.hh:51
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:34
The message representation used throughout the system.
Definition: XrdClMessage.hh:30
Message handler.
Definition: XrdClPostMasterInterfaces.hh:51
virtual void Process()
Definition: XrdClPostMasterInterfaces.hh:125
Interface for socket pollers.
Definition: XrdClPoller.hh:87
Definition: XrdClStream.hh:305
virtual void Run(void *)
The job logic.
Definition: XrdClStream.hh:309
MsgHandler * pHandler
Definition: XrdClStream.hh:315
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:308
HandleIncMsgJob(MsgHandler *handler)
Definition: XrdClStream.hh:307
Stream.
Definition: XrdClStream.hh:51
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
static bool HasNetAddr(const XrdNetAddr &addr, std::vector< XrdNetAddr > &addresses)
Check if addresses contains given address.
Definition: XrdClStream.hh:289
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:90
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:57
@ Disconnected
Not connected.
Definition: XrdClStream.hh:58
@ Error
Broken.
Definition: XrdClStream.hh:61
@ Connected
Connected.
Definition: XrdClStream.hh:59
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:60
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:106
timeval pConnectionStarted
Definition: XrdClStream.hh:366
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:335
bool CanCollapse(const URL &url)
InQueue * pIncomingQueue
Definition: XrdClStream.hh:348
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:347
Utils::AddressType pAddressType
Definition: XrdClStream.hh:359
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:98
void ForceConnect()
Force connection.
XRootDStatus pLastFatalError
Definition: XrdClStream.hh:351
time_t pConnectionInitTime
Definition: XrdClStream.hh:355
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:122
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:262
Poller * pPoller
Definition: XrdClStream.hh:344
TaskManager * pTaskManager
Definition: XrdClStream.hh:345
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:130
Status Query(uint16_t query, AnyObject &result)
Query the stream.
uint32_t pLastStreamError
Definition: XrdClStream.hh:350
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus RequestClose(Message &resp)
Send close after an open request timed out.
uint16_t pConnectionRetry
Definition: XrdClStream.hh:354
void MonitorDisconnection(XRootDStatus status)
Inform the monitoring about disconnection.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:169
uint64_t pBytesReceived
Definition: XrdClStream.hh:369
void Tick(time_t now)
void ForceError(XRootDStatus status)
Force error.
AnyObject * pChannelData
Definition: XrdClStream.hh:349
static RAtomic_uint64_t sSessCntGen
Definition: XrdClStream.hh:379
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:358
~Stream()
Destructor.
const URL * pUrl
Definition: XrdClStream.hh:340
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
const URL pPrefer
Definition: XrdClStream.hh:341
JobManager * pJobManager
Definition: XrdClStream.hh:346
uint16_t pConnectionCount
Definition: XrdClStream.hh:353
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
void OnReadTimeout(uint16_t subStream)
On read timeout.
std::string pStreamName
Definition: XrdClStream.hh:342
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
SubStreamList pSubStreams
Definition: XrdClStream.hh:357
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:360
uint64_t pBytesSent
Definition: XrdClStream.hh:368
uint16_t pConnectionWindow
Definition: XrdClStream.hh:356
TransportHandler * pTransport
Definition: XrdClStream.hh:343
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:352
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:156
timeval pConnectionDone
Definition: XrdClStream.hh:367
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void OnFatalError(uint16_t subStream, XRootDStatus status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
std::shared_ptr< Job > pOnDataConnJob
Definition: XrdClStream.hh:374
static bool IsPartial(Message &msg)
Check if message is a partial response.
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:114
uint64_t pSessionId
Definition: XrdClStream.hh:361
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
Definition: XrdClTaskManager.hh:76
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:310
URL representation.
Definition: XrdClURL.hh:31
AddressType
Address type.
Definition: XrdClUtils.hh:87
Request status.
Definition: XrdClXRootDResponses.hh:219
Definition: XrdNetAddr.hh:42
Definition: XrdSysPthread.hh:263
Definition: XrdSysPthread.hh:242
Definition: XrdSysRAtomic.hh:26
Definition: XrdClAction.hh:34
Definition: XrdClPostMasterInterfaces.hh:269
Procedure execution status.
Definition: XrdClStatus.hh:115