XRootD
XrdClLocalFileHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
3 // Author: Paul-Niklas Kramp <p.n.kramp@gsi.de>
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // XRootD is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU Lesser General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // XRootD is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU Lesser General Public License
17 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
18 //------------------------------------------------------------------------------
20 #include "XrdCl/XrdClConstants.hh"
21 #include "XrdCl/XrdClPostMaster.hh"
22 #include "XrdCl/XrdClURL.hh"
24 #include "XrdCl/XrdClFileSystem.hh"
25 #include "XProtocol/XProtocol.hh"
26 
27 #include "XrdSys/XrdSysE2T.hh"
28 #include "XrdSys/XrdSysXAttr.hh"
29 #include "XrdSys/XrdSysFAttr.hh"
30 #include "XrdSys/XrdSysFD.hh"
31 
32 #include <string>
33 #include <memory>
34 #include <stdexcept>
35 
36 #include <fcntl.h>
37 #include <cstdio>
38 #include <cstdlib>
39 #include <unistd.h>
40 #include <sys/stat.h>
41 #include <arpa/inet.h>
42 #include <aio.h>
43 
44 namespace
45 {
46 
47  class AioCtx
48  {
49  public:
50 
51  enum Opcode
52  {
53  None,
54  Read,
55  Write,
56  Sync
57  };
58 
59  AioCtx( const XrdCl::HostList &hostList, XrdCl::ResponseHandler *handler ) :
60  opcode( None ), hosts( new XrdCl::HostList( hostList ) ), handler( handler )
61  {
62  aiocb *ptr = new aiocb();
63  memset( ptr, 0, sizeof( aiocb ) );
64 
66  int useSignals = XrdCl::DefaultAioSignal;
67  env->GetInt( "AioSignal", useSignals );
68 
69  if( useSignals )
70  {
71  static SignalHandlerRegistrator registrator; // registers the signal handler
72 
73  ptr->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
74  ptr->aio_sigevent.sigev_signo = SIGUSR1;
75  }
76  else
77  {
78  ptr->aio_sigevent.sigev_notify = SIGEV_THREAD;
79  ptr->aio_sigevent.sigev_notify_function = ThreadHandler;
80  }
81 
82  ptr->aio_sigevent.sigev_value.sival_ptr = this;
83 
84  cb.reset( ptr );
85  }
86 
87 
88  void SetWrite( int fd, size_t offset, size_t size, const void *buffer )
89  {
90  cb->aio_fildes = fd;
91  cb->aio_offset = offset;
92  cb->aio_buf = const_cast<void*>( buffer );
93  cb->aio_nbytes = size;
94  opcode = Opcode::Write;
95  }
96 
97  void SetRead( int fd, size_t offset, size_t size, void *buffer )
98  {
99  cb->aio_fildes = fd;
100  cb->aio_offset = offset;
101  cb->aio_buf = buffer;
102  cb->aio_nbytes = size;
103  opcode = Opcode::Read;
104  }
105 
106  void SetFsync( int fd )
107  {
108  cb->aio_fildes = fd;
109  opcode = Opcode::Sync;
110  }
111 
112  static void ThreadHandler( sigval arg )
113  {
114  std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( arg.sival_ptr ) );
115  Handler( std::move( me ) );
116  }
117 
118  static void SignalHandler( int sig, siginfo_t *info, void *ucontext )
119  {
120  std::unique_ptr<AioCtx> me( reinterpret_cast<AioCtx*>( info->si_value.sival_ptr ) );
121  Handler( std::move( me ) );
122  }
123 
124  operator aiocb*()
125  {
126  return cb.get();
127  }
128 
129  private:
130 
131  struct SignalHandlerRegistrator
132  {
133  SignalHandlerRegistrator()
134  {
135  struct sigaction newact, oldact;
136  newact.sa_sigaction = SignalHandler;
137  sigemptyset( &newact.sa_mask );
138  newact.sa_flags = SA_SIGINFO;
139  int rc = sigaction( SIGUSR1, &newact, &oldact );
140  if( rc < 0 )
141  throw std::runtime_error( XrdSysE2T( errno ) );
142  }
143  };
144 
145  static void Handler( std::unique_ptr<AioCtx> me )
146  {
147  if( me->opcode == Opcode::None )
148  return;
149 
150  using namespace XrdCl;
151 
152  int rc = aio_return( me->cb.get() );
153  if( rc < 0 )
154  {
155  int errcode = aio_error( me->cb.get() );
156  Log *log = DefaultEnv::GetLog();
157  log->Error( FileMsg, GetErrMsg( me->opcode ), XrdSysE2T( errcode ) );
158  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errcode ) ;
159  QueueTask( error, 0, me->hosts, me->handler );
160  }
161  else
162  {
163  AnyObject *resp = 0;
164 
165  if( me->opcode == Opcode::Read )
166  {
167  ChunkInfo *chunk = new ChunkInfo( me->cb->aio_offset, rc,
168  const_cast<void*>( me->cb->aio_buf ) );
169  resp = new AnyObject();
170  resp->Set( chunk );
171  }
172 
173  QueueTask( new XRootDStatus(), resp, me->hosts, me->handler );
174  }
175  }
176 
177  static const char* GetErrMsg( Opcode opcode )
178  {
179  static const char readmsg[] = "Read: failed %s";
180  static const char writemsg[] = "Write: failed %s";
181  static const char syncmsg[] = "Sync: failed %s";
182 
183  switch( opcode )
184  {
185  case Opcode::Read: return readmsg;
186 
187  case Opcode::Write: return writemsg;
188 
189  case Opcode::Sync: return syncmsg;
190 
191  default: return 0;
192  }
193  }
194 
195  static void QueueTask( XrdCl::XRootDStatus *status, XrdCl::AnyObject *resp,
196  XrdCl::HostList *hosts, XrdCl::ResponseHandler *handler )
197  {
198  using namespace XrdCl;
199 
200  // if it is simply the sync handler we can release the semaphore
201  // and return there is no need to execute this in the thread-pool
202  if(SyncResponseHandler *syncHandler = dynamic_cast<SyncResponseHandler*>( handler )) {
203  syncHandler->HandleResponse( status, resp );
204  } else if(auto postmaster = DefaultEnv::GetPostMaster()) {
205  if (JobManager *jmngr = postmaster->GetJobManager()) {
206  LocalFileTask *task = new LocalFileTask( status, resp, hosts, handler );
207  jmngr->QueueJob( task );
208  }
209  }
210  }
211 
212  std::unique_ptr<aiocb> cb;
213  Opcode opcode;
214  XrdCl::HostList *hosts;
215  XrdCl::ResponseHandler *handler;
216  };
217 
218 };
219 
220 namespace XrdCl
221 {
222 
223  //------------------------------------------------------------------------
224  // Constructor
225  //------------------------------------------------------------------------
226  LocalFileHandler::LocalFileHandler() :
227  fd( -1 )
228  {
229  }
230 
231  //------------------------------------------------------------------------
232  // Destructor
233  //------------------------------------------------------------------------
235  {
236 
237  }
238 
239  //------------------------------------------------------------------------
240  // Open
241  //------------------------------------------------------------------------
242  XRootDStatus LocalFileHandler::Open( const std::string& url, uint16_t flags,
243  uint16_t mode, ResponseHandler* handler, uint16_t timeout )
244  {
245  AnyObject *resp = 0;
246  XRootDStatus st = OpenImpl( url, flags, mode, resp );
247  if( !st.IsOK() && st.code != errLocalError )
248  return st;
249 
250  return QueueTask( new XRootDStatus( st ), resp, handler );
251  }
252 
253  XRootDStatus LocalFileHandler::Open( const URL *url, const Message *req, AnyObject *&resp )
254  {
255  const ClientOpenRequest* request =
256  reinterpret_cast<const ClientOpenRequest*>( req->GetBuffer() );
257  uint16_t flags = ntohs( request->options );
258  uint16_t mode = ntohs( request->mode );
259  return OpenImpl( url->GetURL(), flags, mode, resp );
260  }
261 
262  //------------------------------------------------------------------------
263  // Close
264  //------------------------------------------------------------------------
266  uint16_t timeout )
267  {
268  if( close( fd ) == -1 )
269  {
270  Log *log = DefaultEnv::GetLog();
271  log->Error( FileMsg, "Close: file fd: %i %s", fd, XrdSysE2T( errno ) );
272  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
273  return QueueTask( error, 0, handler );
274  }
275 
276  return QueueTask( new XRootDStatus(), 0, handler );
277  }
278 
279  //------------------------------------------------------------------------
280  // Stat
281  //------------------------------------------------------------------------
283  uint16_t timeout )
284  {
285  Log *log = DefaultEnv::GetLog();
286 
287  struct stat ssp;
288  if( fstat( fd, &ssp ) == -1 )
289  {
290  log->Error( FileMsg, "Stat: failed fd: %i %s", fd, XrdSysE2T( errno ) );
291  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
292  return QueueTask( error, 0, handler );
293  }
294  std::ostringstream data;
295  data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
296  << ssp.st_mtime;
297  log->Debug( FileMsg, "%s", data.str().c_str() );
298 
299  StatInfo *statInfo = new StatInfo();
300  if( !statInfo->ParseServerResponse( data.str().c_str() ) )
301  {
302  log->Error( FileMsg, "Stat: ParseServerResponse failed." );
303  delete statInfo;
305  0, handler );
306  }
307 
308  AnyObject *resp = new AnyObject();
309  resp->Set( statInfo );
310  return QueueTask( new XRootDStatus(), resp, handler );
311  }
312 
313  //------------------------------------------------------------------------
314  // Read
315  //------------------------------------------------------------------------
316  XRootDStatus LocalFileHandler::Read( uint64_t offset, uint32_t size,
317  void* buffer, ResponseHandler* handler, uint16_t timeout )
318  {
319 #if defined(__APPLE__)
320  Log *log = DefaultEnv::GetLog();
321  int read = 0;
322  if( ( read = pread( fd, buffer, size, offset ) ) == -1 )
323  {
324  log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
325  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
326  return QueueTask( error, 0, handler );
327  }
328  ChunkInfo *chunk = new ChunkInfo( offset, read, buffer );
329  AnyObject *resp = new AnyObject();
330  resp->Set( chunk );
331  return QueueTask( new XRootDStatus(), resp, handler );
332 #else
333  AioCtx *ctx = new AioCtx( pHostList, handler );
334  ctx->SetRead( fd, offset, size, buffer );
335 
336  int rc = aio_read( *ctx );
337 
338  if( rc < 0 )
339  {
340  Log *log = DefaultEnv::GetLog();
341  log->Error( FileMsg, "Read: failed %s", XrdSysE2T( errno ) );
342  return XRootDStatus( stError, errLocalError, errno );
343  }
344 
345  return XRootDStatus();
346 #endif
347  }
348 
349 
350  //------------------------------------------------------------------------
351  // ReadV
352  //------------------------------------------------------------------------
354  struct iovec *iov,
355  int iovcnt,
356  ResponseHandler *handler,
357  uint16_t timeout )
358  {
359  Log *log = DefaultEnv::GetLog();
360 #if defined(__APPLE__)
361  ssize_t ret = lseek( fd, offset, SEEK_SET );
362  if( ret >= 0 )
363  ret = readv( fd, iov, iovcnt );
364 #else
365  ssize_t ret = preadv( fd, iov, iovcnt, offset );
366 #endif
367  if( ret == -1 )
368  {
369  log->Error( FileMsg, "ReadV: failed %s", XrdSysE2T( errno ) );
370  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
371  return QueueTask( error, 0, handler );
372  }
373  VectorReadInfo *info = new VectorReadInfo();
374  info->SetSize( ret );
375  uint64_t choff = offset;
376  uint32_t left = ret;
377  for( int i = 0; i < iovcnt; ++i )
378  {
379  uint32_t chlen = iov[i].iov_len;
380  if( chlen > left ) chlen = left;
381  info->GetChunks().emplace_back( choff, chlen, iov[i].iov_base);
382  left -= chlen;
383  choff += chlen;
384  }
385  AnyObject *resp = new AnyObject();
386  resp->Set( info );
387  return QueueTask( new XRootDStatus(), resp, handler );
388  }
389 
390  //------------------------------------------------------------------------
391  // Write
392  //------------------------------------------------------------------------
393  XRootDStatus LocalFileHandler::Write( uint64_t offset, uint32_t size,
394  const void* buffer, ResponseHandler* handler, uint16_t timeout )
395  {
396 #if defined(__APPLE__)
397  const char *buff = reinterpret_cast<const char*>( buffer );
398  size_t bytesWritten = 0;
399  while( bytesWritten < size )
400  {
401  ssize_t ret = pwrite( fd, buff, size, offset );
402  if( ret < 0 )
403  {
404  Log *log = DefaultEnv::GetLog();
405  log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
406  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
407  return QueueTask( error, 0, handler );
408  }
409  offset += ret;
410  buff += ret;
411  bytesWritten += ret;
412  }
413  return QueueTask( new XRootDStatus(), 0, handler );
414 #else
415  AioCtx *ctx = new AioCtx( pHostList, handler );
416  ctx->SetWrite( fd, offset, size, buffer );
417 
418  int rc = aio_write( *ctx );
419 
420  if( rc < 0 )
421  {
422  Log *log = DefaultEnv::GetLog();
423  log->Error( FileMsg, "Write: failed %s", XrdSysE2T( errno ) );
424  return XRootDStatus( stError, errLocalError, errno );
425  }
426 
427  return XRootDStatus();
428 #endif
429  }
430 
431  //------------------------------------------------------------------------
432  // Sync
433  //------------------------------------------------------------------------
435  uint16_t timeout )
436  {
437 #if defined(__APPLE__)
438  if( fsync( fd ) )
439  {
440  Log *log = DefaultEnv::GetLog();
441  log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
443  XProtocol::mapError( errno ),
444  XrdSysE2T( errno ) );
445  return QueueTask( error, 0, handler );
446  }
447  return QueueTask( new XRootDStatus(), 0, handler );
448 #else
449  AioCtx *ctx = new AioCtx( pHostList, handler );
450  ctx->SetFsync( fd );
451  int rc = aio_fsync( O_SYNC, *ctx );
452  if( rc < 0 )
453  {
454  Log *log = DefaultEnv::GetLog();
455  log->Error( FileMsg, "Sync: failed %s", XrdSysE2T( errno ) );
456  return XRootDStatus( stError, errLocalError, errno );
457  }
458 #endif
459  return XRootDStatus();
460  }
461 
462  //------------------------------------------------------------------------
463  // Truncate
464  //------------------------------------------------------------------------
466  ResponseHandler* handler, uint16_t timeout )
467  {
468  if( ftruncate( fd, size ) )
469  {
470  Log *log = DefaultEnv::GetLog();
471  log->Error( FileMsg, "Truncate: failed, file descriptor: %i, %s", fd,
472  XrdSysE2T( errno ) );
473  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
474  return QueueTask( error, 0, handler );
475  }
476 
477  return QueueTask( new XRootDStatus( stOK ), 0, handler );
478  }
479 
480  //------------------------------------------------------------------------
481  // VectorRead
482  //------------------------------------------------------------------------
484  void* buffer, ResponseHandler* handler, uint16_t timeout )
485  {
486  std::unique_ptr<VectorReadInfo> info( new VectorReadInfo() );
487  size_t totalSize = 0;
488  bool useBuffer( buffer );
489 
490  for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
491  {
492  auto &chunk = *itr;
493  if( !useBuffer )
494  buffer = chunk.buffer;
495  ssize_t bytesRead = pread( fd, buffer, chunk.length,
496  chunk.offset );
497  if( bytesRead < 0 )
498  {
499  Log *log = DefaultEnv::GetLog();
500  log->Error( FileMsg, "VectorRead: failed, file descriptor: %i, %s",
501  fd, XrdSysE2T( errno ) );
502  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
503  return QueueTask( error, 0, handler );
504  }
505  totalSize += bytesRead;
506  info->GetChunks().push_back( ChunkInfo( chunk.offset, bytesRead, buffer ) );
507  if( useBuffer )
508  buffer = reinterpret_cast<char*>( buffer ) + bytesRead;
509  }
510 
511  info->SetSize( totalSize );
512  AnyObject *resp = new AnyObject();
513  resp->Set( info.release() );
514  return QueueTask( new XRootDStatus(), resp, handler );
515  }
516 
517  //------------------------------------------------------------------------
518  // VectorWrite
519  //------------------------------------------------------------------------
521  ResponseHandler *handler, uint16_t timeout )
522  {
523 
524  for( auto itr = chunks.begin(); itr != chunks.end(); ++itr )
525  {
526  auto &chunk = *itr;
527  ssize_t bytesWritten = pwrite( fd, chunk.buffer, chunk.length,
528  chunk.offset );
529  if( bytesWritten < 0 )
530  {
531  Log *log = DefaultEnv::GetLog();
532  log->Error( FileMsg, "VectorWrite: failed, file descriptor: %i, %s",
533  fd, XrdSysE2T( errno ) );
534  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
535  return QueueTask( error, 0, handler );
536  }
537  }
538 
539  return QueueTask( new XRootDStatus(), 0, handler );
540  }
541 
542  //------------------------------------------------------------------------
543  // WriteV
544  //------------------------------------------------------------------------
546  ChunkList *chunks,
547  ResponseHandler *handler,
548  uint16_t timeout )
549  {
550  size_t iovcnt = chunks->size();
551  iovec iovcp[iovcnt];
552  ssize_t size = 0;
553  for( size_t i = 0; i < iovcnt; ++i )
554  {
555  iovcp[i].iov_base = (*chunks)[i].buffer;
556  iovcp[i].iov_len = (*chunks)[i].length;
557  size += (*chunks)[i].length;
558  }
559  iovec *iovptr = iovcp;
560 
561  ssize_t bytesWritten = 0;
562  while( bytesWritten < size )
563  {
564 #ifdef __APPLE__
565  ssize_t ret = lseek( fd, offset, SEEK_SET );
566  if( ret >= 0 )
567  ret = writev( fd, iovptr, iovcnt );
568 #else
569  ssize_t ret = pwritev( fd, iovptr, iovcnt, offset );
570 #endif
571  if( ret < 0 )
572  {
573  Log *log = DefaultEnv::GetLog();
574  log->Error( FileMsg, "WriteV: failed %s", XrdSysE2T( errno ) );
575  XRootDStatus *error = new XRootDStatus( stError, errLocalError, errno );
576  return QueueTask( error, 0, handler );
577  }
578 
579  bytesWritten += ret;
580  while( ret )
581  {
582  if( size_t( ret ) > iovptr[0].iov_len )
583  {
584  ret -= iovptr[0].iov_len;
585  --iovcnt;
586  ++iovptr;
587  }
588  else
589  {
590  iovptr[0].iov_len -= ret;
591  iovptr[0].iov_base = reinterpret_cast<char*>( iovptr[0].iov_base ) + ret;
592  ret = 0;
593  }
594  }
595  }
596 
597  return QueueTask( new XRootDStatus(), 0, handler );
598  }
599 
600  //------------------------------------------------------------------------
601  // Fcntl
602  //------------------------------------------------------------------------
604  ResponseHandler *handler, uint16_t timeout )
605  {
607  }
608 
609  //------------------------------------------------------------------------
610  // Visa
611  //------------------------------------------------------------------------
613  uint16_t timeout )
614  {
616  }
617 
618  //------------------------------------------------------------------------
619  // Set extended attributes - async
620  //------------------------------------------------------------------------
621  XRootDStatus LocalFileHandler::SetXAttr( const std::vector<xattr_t> &attrs,
622  ResponseHandler *handler,
623  uint16_t timeout )
624  {
625  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
626  std::vector<XAttrStatus> response;
627 
628  auto itr = attrs.begin();
629  for( ; itr != attrs.end(); ++itr )
630  {
631  std::string name = std::get<xattr_name>( *itr );
632  std::string value = std::get<xattr_value>( *itr );
633  int err = xattr->Set( name.c_str(), value.c_str(), value.size(), 0, fd );
634  XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
635  XRootDStatus();
636 
637  response.push_back( XAttrStatus( name, status ) );
638  }
639 
640  AnyObject *resp = new AnyObject();
641  resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
642 
643  return QueueTask( new XRootDStatus(), resp, handler );
644  }
645 
646  //------------------------------------------------------------------------
647  // Get extended attributes - async
648  //------------------------------------------------------------------------
649  XRootDStatus LocalFileHandler::GetXAttr( const std::vector<std::string> &attrs,
650  ResponseHandler *handler,
651  uint16_t timeout )
652  {
653  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
654  std::vector<XAttr> response;
655 
656  auto itr = attrs.begin();
657  for( ; itr != attrs.end(); ++itr )
658  {
659  std::string name = *itr;
660  std::unique_ptr<char[]> buffer;
661 
662  int size = xattr->Get( name.c_str(), 0, 0, 0, fd );
663  if( size < 0 )
664  {
665  XRootDStatus status( stError, errLocalError, -size );
666  response.push_back( XAttr( *itr, "", status ) );
667  continue;
668  }
669  buffer.reset( new char[size] );
670  int ret = xattr->Get( name.c_str(), buffer.get(), size, 0, fd );
671 
672  XRootDStatus status;
673  std::string value;
674 
675  if( ret >= 0 )
676  value.append( buffer.get(), ret );
677  else if( ret < 0 )
678  status = XRootDStatus( stError, errLocalError, -ret );
679 
680  response.push_back( XAttr( *itr, value, status ) );
681  }
682 
683  AnyObject *resp = new AnyObject();
684  resp->Set( new std::vector<XAttr>( std::move( response ) ) );
685 
686  return QueueTask( new XRootDStatus(), resp, handler );
687  }
688 
689  //------------------------------------------------------------------------
690  // Delete extended attributes - async
691  //------------------------------------------------------------------------
692  XRootDStatus LocalFileHandler::DelXAttr( const std::vector<std::string> &attrs,
693  ResponseHandler *handler,
694  uint16_t timeout )
695  {
696  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
697  std::vector<XAttrStatus> response;
698 
699  auto itr = attrs.begin();
700  for( ; itr != attrs.end(); ++itr )
701  {
702  std::string name = *itr;
703  int err = xattr->Del( name.c_str(), 0, fd );
704  XRootDStatus status = err < 0 ? XRootDStatus( stError, errLocalError, -err ) :
705  XRootDStatus();
706 
707  response.push_back( XAttrStatus( name, status ) );
708  }
709 
710  AnyObject *resp = new AnyObject();
711  resp->Set( new std::vector<XAttrStatus>( std::move( response ) ) );
712 
713  return QueueTask( new XRootDStatus(), resp, handler );
714  }
715 
716  //------------------------------------------------------------------------
717  // List extended attributes - async
718  //------------------------------------------------------------------------
720  uint16_t timeout )
721  {
722  XrdSysXAttr *xattr = XrdSysFAttr::Xat;
723  std::vector<XAttr> response;
724 
725  XrdSysXAttr::AList *alist = 0;
726  int err = xattr->List( &alist, 0, fd, 1 );
727 
728  if( err < 0 )
729  {
730  XRootDStatus *status = new XRootDStatus( stError, XProtocol::mapError( -err ) );
731  return QueueTask( status, 0, handler );
732  }
733 
734  XrdSysXAttr::AList *ptr = alist;
735  while( ptr )
736  {
737  std::string name( ptr->Name, ptr->Nlen );
738  int vlen = ptr->Vlen;
739  ptr = ptr->Next;
740 
741  std::unique_ptr<char[]> buffer( new char[vlen] );
742  int ret = xattr->Get( name.c_str(),
743  buffer.get(), vlen, 0, fd );
744 
745  std::string value = ret >= 0 ? std::string( buffer.get(), ret ) :
746  std::string();
747  XRootDStatus status = ret >= 0 ? XRootDStatus() :
749  response.push_back( XAttr( name, value, status ) );
750  }
751  xattr->Free( alist );
752 
753  AnyObject *resp = new AnyObject();
754  resp->Set( new std::vector<XAttr>( std::move( response ) ) );
755 
756  return QueueTask( new XRootDStatus(), resp, handler );
757  }
758 
759  //------------------------------------------------------------------------
760  // QueueTask - queues error/success tasks for all operations.
761  // Must always return stOK.
762  // Is always creating the same HostList containing only localhost.
763  //------------------------------------------------------------------------
765  ResponseHandler *handler )
766  {
767  // if it is simply the sync handler we can release the semaphore
768  // and return there is no need to execute this in the thread-pool
769  if (SyncResponseHandler *syncHandler = dynamic_cast<SyncResponseHandler*>(handler)) {
770  syncHandler->HandleResponse( st, resp );
771  return XRootDStatus();
772  }
773 
774  if (auto postmaster = DefaultEnv::GetPostMaster()) {
775  HostList *hosts = pHostList.empty() ? 0 : new HostList( pHostList );
776  LocalFileTask *task = new LocalFileTask( st, resp, hosts, handler );
777  postmaster->GetJobManager()->QueueJob( task );
778  }
779  return XRootDStatus();
780  }
781 
782  //------------------------------------------------------------------------
783  // MkdirPath - creates the folders specified in file_path
784  // called if kXR_mkdir flag is set
785  //------------------------------------------------------------------------
786  XRootDStatus LocalFileHandler::MkdirPath( const std::string &path )
787  {
788  // first find the most up-front component that exists
789  size_t pos = path.rfind( '/' );
790  while( pos != std::string::npos && pos != 0 )
791  {
792  std::string tmp = path.substr( 0, pos );
793  struct stat st;
794  int rc = lstat( tmp.c_str(), &st );
795  if( rc == 0 ) break;
796  if( errno != ENOENT )
797  return XRootDStatus( stError, errLocalError, errno );
798  pos = path.rfind( '/', pos - 1 );
799  }
800 
801  pos = path.find( '/', pos + 1 );
802  while( pos != std::string::npos && pos != 0 )
803  {
804  std::string tmp = path.substr( 0, pos );
805  if( mkdir( tmp.c_str(), 0755 ) )
806  {
807  if( errno != EEXIST )
808  return XRootDStatus( stError, errLocalError, errno );
809  }
810  pos = path.find( '/', pos + 1 );
811  }
812  return XRootDStatus();
813  }
814 
815  XRootDStatus LocalFileHandler::OpenImpl( const std::string &url, uint16_t flags,
816  uint16_t mode, AnyObject *&resp)
817  {
818  Log *log = DefaultEnv::GetLog();
819 
820  // safe the file URL for the HostList for later
821  pUrl = url;
822 
823  URL fileUrl( url );
824  if( !fileUrl.IsValid() )
826 
827  if( fileUrl.GetHostName() != "localhost" )
829 
830  std::string path = fileUrl.GetPath();
831 
832  //---------------------------------------------------------------------
833  // Prepare Flags
834  //---------------------------------------------------------------------
835  uint16_t openflags = 0;
836  if( flags & kXR_new )
837  openflags |= O_CREAT | O_EXCL;
838  if( flags & kXR_open_wrto )
839  openflags |= O_WRONLY;
840  else if( flags & kXR_open_updt )
841  openflags |= O_RDWR;
842  else
843  openflags |= O_RDONLY;
844  if( flags & kXR_delete )
845  openflags |= O_CREAT | O_TRUNC;
846 
847  if( flags & (kXR_mkpath | kXR_async) )
848  {
849  XRootDStatus st = MkdirPath( path );
850  if( !st.IsOK() )
851  {
852  log->Error( FileMsg, "Open MkdirPath failed %s: %s", path.c_str(),
853  XrdSysE2T( st.errNo ) );
854  return st;
855  }
856 
857  }
858  //---------------------------------------------------------------------
859  // Open File
860  //---------------------------------------------------------------------
861  if( mode == Access::Mode::None)
862  mode = 0644;
863  fd = XrdSysFD_Open( path.c_str(), openflags, mode );
864  if( fd == -1 )
865  {
866  log->Error( FileMsg, "Open: open failed: %s: %s", path.c_str(),
867  XrdSysE2T( errno ) );
868 
870  XProtocol::mapError( errno ) );
871  }
872  //---------------------------------------------------------------------
873  // Stat File and cache statInfo in openInfo
874  //---------------------------------------------------------------------
875  struct stat ssp;
876  if( fstat( fd, &ssp ) == -1 )
877  {
878  log->Error( FileMsg, "Open: stat failed." );
880  XProtocol::mapError( errno ) );
881  }
882 
883  std::ostringstream data;
884  data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " "
885  << ssp.st_mtime;
886 
887  StatInfo *statInfo = new StatInfo();
888  if( !statInfo->ParseServerResponse( data.str().c_str() ) )
889  {
890  log->Error( FileMsg, "Open: ParseServerResponse failed." );
891  delete statInfo;
893  }
894 
895  // add the URL to hosts list
896  pHostList.push_back( HostInfo( pUrl, false ) );
897 
898  //All went well
899  uint32_t ufd = fd;
900  OpenInfo *openInfo = new OpenInfo( (uint8_t*)&ufd, 1, statInfo );
901  resp = new AnyObject();
902  resp->Set( openInfo );
903  return XRootDStatus();
904  }
905 
906  //------------------------------------------------------------------------
907  // Parses kXR_fattr request and calls respective XAttr operation
908  //------------------------------------------------------------------------
909  XRootDStatus LocalFileHandler::XAttrImpl( kXR_char code,
910  kXR_char numattr,
911  size_t bodylen,
912  char *body,
913  ResponseHandler *handler )
914  {
915  // shift body by 1 to omit the empty path
916  if( bodylen > 0 )
917  {
918  ++body;
919  --bodylen;
920  }
921 
922  switch( code )
923  {
924  case kXR_fattrGet:
925  case kXR_fattrDel:
926  {
927  std::vector<std::string> attrs;
928  // parse namevec
929  for( kXR_char i = 0; i < numattr; ++i )
930  {
931  if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
932  // shift by RC size
933  body += sizeof( kXR_unt16 );
934  bodylen -= sizeof( kXR_unt16 );
935  // get the size of attribute name
936  size_t len = strlen( body );
937  if( len > bodylen ) return XRootDStatus( stError, errDataError );
938  attrs.push_back( std::string( body, len ) );
939  body += len + 1; // +1 for the null terminating the string
940  bodylen -= len + 1; // +1 for the null terminating the string
941  }
942 
943  if( code == kXR_fattrGet )
944  return GetXAttr( attrs, handler );
945 
946  return DelXAttr( attrs, handler );
947  }
948 
949  case kXR_fattrSet:
950  {
951  std::vector<xattr_t> attrs;
952  // parse namevec
953  for( kXR_char i = 0; i < numattr; ++i )
954  {
955  if( bodylen < sizeof( kXR_unt16 ) ) return XRootDStatus( stError, errDataError );
956  // shift by RC size
957  body += sizeof( kXR_unt16 );
958  bodylen -= sizeof( kXR_unt16 );
959  // get the size of attribute name
960  char *name = 0;
961  body = ClientFattrRequest::NVecRead( body, name );
962  attrs.push_back( std::make_tuple( std::string( name ), std::string() ) );
963  bodylen -= strlen( name ) + 1; // +1 for the null terminating the string
964  free( name );
965  }
966  // parse valuevec
967  for( kXR_char i = 0; i < numattr; ++i )
968  {
969  // get value length
970  if( bodylen < sizeof( kXR_int32 ) ) return XRootDStatus( stError, errDataError );
971  kXR_int32 len = 0;
972  body = ClientFattrRequest::VVecRead( body, len );
973  bodylen -= sizeof( kXR_int32 );
974  // get value
975  if( size_t( len ) > bodylen ) return XRootDStatus( stError, errDataError );
976  char *value = 0;
977  body = ClientFattrRequest::VVecRead( body, len, value );
978  bodylen -= len;
979  std::get<xattr_value>( attrs[i] ) = value;
980  free( value );
981  }
982 
983  return SetXAttr( attrs, handler );
984  }
985 
986  case kXR_fattrList:
987  {
988  return ListXAttr( handler );
989  }
990 
991  default:
993  }
994 
995  return XRootDStatus();
996  }
997 
999  Message *msg,
1000  ResponseHandler *handler,
1001  MessageSendParams &sendParams )
1002  {
1003  ClientRequest *req = reinterpret_cast<ClientRequest*>( msg->GetBuffer() );
1004 
1005  switch( req->header.requestid )
1006  {
1007  case kXR_open:
1008  {
1009  XRootDStatus st = Open( url.GetURL(), req->open.options,
1010  req->open.mode, handler, sendParams.timeout );
1011  delete msg; // in case of other operations msg is owned by the handler
1012  return st;
1013  }
1014 
1015  case kXR_close:
1016  {
1017  return Close( handler, sendParams.timeout );
1018  }
1019 
1020  case kXR_stat:
1021  {
1022  return Stat( handler, sendParams.timeout );
1023  }
1024 
1025  case kXR_read:
1026  {
1027  if( msg->GetVirtReqID() == kXR_virtReadv )
1028  {
1029  auto &chunkList = *sendParams.chunkList;
1030  struct iovec iov[chunkList.size()];
1031  for( size_t i = 0; i < chunkList.size() ; ++i )
1032  {
1033  iov[i].iov_base = chunkList[i].buffer;
1034  iov[i].iov_len = chunkList[i].length;
1035  }
1036  return ReadV( chunkList.front().offset, iov, chunkList.size(),
1037  handler, sendParams.timeout );
1038  }
1039 
1040  return Read( req->read.offset, req->read.rlen,
1041  sendParams.chunkList->front().buffer,
1042  handler, sendParams.timeout );
1043  }
1044 
1045  case kXR_write:
1046  {
1047  ChunkList *chunks = sendParams.chunkList;
1048  if( chunks->size() == 1 )
1049  {
1050  // it's an ordinary write
1051  return Write( req->write.offset, req->write.dlen,
1052  chunks->front().buffer, handler,
1053  sendParams.timeout );
1054  }
1055  // it's WriteV call
1056  return WriteV( req->write.offset, sendParams.chunkList,
1057  handler, sendParams.timeout );
1058  }
1059 
1060  case kXR_sync:
1061  {
1062  return Sync( handler, sendParams.timeout );
1063  }
1064 
1065  case kXR_truncate:
1066  {
1067  return Truncate( req->truncate.offset, handler, sendParams.timeout );
1068  }
1069 
1070  case kXR_writev:
1071  {
1072  return VectorWrite( *sendParams.chunkList, handler,
1073  sendParams.timeout );
1074  }
1075 
1076  case kXR_readv:
1077  {
1078  return VectorRead( *sendParams.chunkList, 0,
1079  handler, sendParams.timeout );
1080  }
1081 
1082  case kXR_fattr:
1083  {
1084  return XAttrImpl( req->fattr.subcode, req->fattr.numattr, req->fattr.dlen,
1085  msg->GetBuffer( sizeof(ClientRequestHdr ) ), handler );
1086  }
1087 
1088  default:
1089  {
1091  }
1092  }
1093  }
1094 }
@ kXR_FSError
Definition: XProtocol.hh:995
struct ClientTruncateRequest truncate
Definition: XProtocol.hh:875
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
struct ClientFattrRequest fattr
Definition: XProtocol.hh:854
kXR_int64 offset
Definition: XProtocol.hh:646
@ kXR_virtReadv
Definition: XProtocol.hh:150
kXR_unt16 options
Definition: XProtocol.hh:481
@ kXR_open_wrto
Definition: XProtocol.hh:469
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_mkpath
Definition: XProtocol.hh:460
@ kXR_new
Definition: XProtocol.hh:455
struct ClientOpenRequest open
Definition: XProtocol.hh:860
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_close
Definition: XProtocol.hh:115
struct ClientReadRequest read
Definition: XProtocol.hh:867
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
kXR_int32 rlen
Definition: XProtocol.hh:647
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
ssize_t pwrite(int fildes, const void *buf, size_t nbyte, off_t offset)
off_t lseek(int fildes, off_t offset, int whence)
int stat(const char *path, struct stat *buf)
int ftruncate(int fildes, off_t offset)
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
int fstat(int fildes, struct stat *buf)
int lstat(const char *path, struct stat *buf)
int mkdir(const char *path, mode_t mode)
int fsync(int fildes)
ssize_t readv(int fildes, const struct iovec *iov, int iovcnt)
ssize_t writev(int fildes, const struct iovec *iov, int iovcnt)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:48
struct sigevent aio_sigevent
Definition: XrdSfsAio.hh:51
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static int mapError(int rc)
Definition: XProtocol.hh:1362
void Set(Type object, bool own=true)
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
XRootDStatus Truncate(uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Stat(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ReadV(uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus QueueTask(XRootDStatus *st, AnyObject *obj, ResponseHandler *handler)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus MkdirPath(const std::string &path)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus WriteV(uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus DelXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Visa(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus VectorWrite(const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus GetXAttr(const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Open operation (.
Information returned by file open operation.
Handle an async response.
Object stat info.
bool ParseServerResponse(const char *data)
Parse server response and fill up the object.
Synchronize the response.
URL representation.
Definition: XrdClURL.hh:31
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
void SetSize(uint32_t size)
Set size.
ChunkList & GetChunks()
Get chunks.
static XrdSysXAttr * Xat
Definition: XrdSysFAttr.hh:51
char Name[1]
Start of the name (size of struct is dynamic)
Definition: XrdSysXAttr.hh:56
int Vlen
The length of the attribute value;.
Definition: XrdSysXAttr.hh:54
virtual int List(AList **aPL, const char *Path, int fd=-1, int getSz=0)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
int Nlen
The length of the attribute name that follows.
Definition: XrdSysXAttr.hh:55
virtual void Free(AList *aPL)=0
virtual int Del(const char *Aname, const char *Path, int fd=-1)=0
AList * Next
-> next element.
Definition: XrdSysXAttr.hh:53
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t FileMsg
const uint16_t errOSError
Definition: XrdClStatus.hh:61
SyncImpl< false > Sync(Ctx< File > file, uint16_t timeout=0)
Factory for creating SyncImpl objects.
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errLocalError
Definition: XrdClStatus.hh:107
const int DefaultAioSignal
static char * NVecRead(char *buffer, kXR_unt16 &rc)
Definition: XProtocol.cc:205
static char * VVecRead(char *buffer, kXR_int32 &len)
Definition: XProtocol.cc:224
Describe a data chunk for vector read.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148
Extended attribute operation status.
Extended attributes with status.