XRootD
Loading...
Searching...
No Matches
XrdClThirdPartyCopyJob.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClUtils.hh"
31#include "XrdCl/XrdClMonitor.hh"
33#include "XrdCl/XrdClDlgEnv.hh"
34#include "XrdOuc/XrdOucTPC.hh"
36#include "XrdSys/XrdSysTimer.hh"
37
38#include <iostream>
39#include <chrono>
40
41#include <cctype>
42#include <sstream>
43#include <cstdlib>
44#include <cstdio>
45#include <sys/time.h>
46#include <sys/types.h>
47#include <unistd.h>
48
49namespace
50{
51 //----------------------------------------------------------------------------
53 //----------------------------------------------------------------------------
54 class TPCStatusHandler: public XrdCl::ResponseHandler
55 {
56 public:
57 //------------------------------------------------------------------------
58 // Constructor
59 //------------------------------------------------------------------------
60 TPCStatusHandler():
61 pSem( new XrdSysSemaphore(0) ), pStatus(0)
62 {
63 }
64
65 //------------------------------------------------------------------------
66 // Destructor
67 //------------------------------------------------------------------------
68 virtual ~TPCStatusHandler()
69 {
70 delete pStatus;
71 delete pSem;
72 }
73
74 //------------------------------------------------------------------------
75 // Handle Response
76 //------------------------------------------------------------------------
77 virtual void HandleResponse( XrdCl::XRootDStatus *status,
78 XrdCl::AnyObject *response )
79 {
80 delete response;
81 pStatus = status;
82 pSem->Post();
83 }
84
85 //------------------------------------------------------------------------
86 // Get Mutex
87 //------------------------------------------------------------------------
88 XrdSysSemaphore *GetXrdSysSemaphore()
89 {
90 return pSem;
91 }
92
93 //------------------------------------------------------------------------
94 // Get status
95 //------------------------------------------------------------------------
96 XrdCl::XRootDStatus *GetStatus()
97 {
98 return pStatus;
99 }
100
101 private:
102 TPCStatusHandler(const TPCStatusHandler &other);
103 TPCStatusHandler &operator = (const TPCStatusHandler &other);
104
105 XrdSysSemaphore *pSem;
106 XrdCl::XRootDStatus *pStatus;
107 };
108
109 class InitTimeoutCalc
110 {
111 public:
112
113 InitTimeoutCalc( uint16_t timeLeft ) :
114 hasInitTimeout( timeLeft ), start( time( 0 ) ), timeLeft( timeLeft )
115 {
116
117 }
118
119 XrdCl::XRootDStatus operator()()
120 {
121 if( !hasInitTimeout ) return XrdCl::XRootDStatus();
122
123 time_t now = time( 0 );
124 if( now - start > timeLeft )
126
127 timeLeft -= ( now - start );
128 return XrdCl::XRootDStatus();
129 }
130
131 operator uint16_t()
132 {
133 return timeLeft;
134 }
135
136 private:
137 bool hasInitTimeout;
138 time_t start;
139 uint16_t timeLeft;
140 };
141
142 static XrdCl::XRootDStatus& UpdateErrMsg( XrdCl::XRootDStatus &status, const std::string &str )
143 {
144 std::string msg = status.GetErrorMessage();
145 msg += " (" + str + ")";
146 status.SetErrorMessage( msg );
147 return status;
148 }
149}
150
151namespace XrdCl
152{
153 //----------------------------------------------------------------------------
154 // Constructor
155 //----------------------------------------------------------------------------
157 PropertyList *jobProperties,
158 PropertyList *jobResults ):
159 CopyJob( jobId, jobProperties, jobResults ),
160 dstFile( File::DisableVirtRedirect ),
161 sourceSize( 0 ),
162 initTimeout( 0 ),
163 force( false ),
164 coerce( false ),
165 delegate( false ),
166 nbStrm( 0 ),
167 tpcLite( false )
168 {
169 Log *log = DefaultEnv::GetLog();
170 log->Debug( UtilityMsg, "Creating a third party copy job, from %s to %s",
171 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
172 }
173
174 //----------------------------------------------------------------------------
175 // Run the copy job
176 //----------------------------------------------------------------------------
178 {
179 Log *log = DefaultEnv::GetLog();
180
181 XRootDStatus st = CanDo();
182 if( !st.IsOK() ) return st;
183
184 if( tpcLite )
185 {
186 //------------------------------------------------------------------------
187 // Run TPC-lite algorithm
188 //------------------------------------------------------------------------
189 XRootDStatus st = RunLite( progress );
190 if( !st.IsOK() ) return st;
191 }
192 else
193 {
194 //------------------------------------------------------------------------
195 // Run vanilla TPC algorithm
196 //------------------------------------------------------------------------
197 XRootDStatus st = RunTPC( progress );
198 if( !st.IsOK() ) return st;
199 }
200
201 //--------------------------------------------------------------------------
202 // Verify the checksums if needed
203 //--------------------------------------------------------------------------
204 if( checkSumMode != "none" )
205 {
206 log->Debug( UtilityMsg, "Attempting checksum calculation." );
207 std::string sourceCheckSum;
208 std::string targetCheckSum;
209
210 //------------------------------------------------------------------------
211 // Get the check sum at source
212 //------------------------------------------------------------------------
213 timeval oStart, oEnd;
214 XRootDStatus st;
215 if( checkSumMode == "end2end" || checkSumMode == "source" ||
216 !checkSumPreset.empty() )
217 {
218 gettimeofday( &oStart, 0 );
219 if( !checkSumPreset.empty() )
220 {
221 sourceCheckSum = checkSumType + ":";
222 sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
223 checkSumPreset );
224 }
225 else
226 {
227 VirtualRedirector *redirector = 0;
228 std::string vrCheckSum;
229 if( GetSource().IsMetalink() &&
230 ( redirector = RedirectorRegistry::Instance().Get( GetSource() ) ) &&
231 !( vrCheckSum = redirector->GetCheckSum( checkSumType ) ).empty() )
232 sourceCheckSum = vrCheckSum;
233 else
234 st = Utils::GetRemoteCheckSum( sourceCheckSum, checkSumType, tpcSource );
235 }
236 gettimeofday( &oEnd, 0 );
237 if( !st.IsOK() )
238 return UpdateErrMsg( st, "source" );
239
240 pResults->Set( "sourceCheckSum", sourceCheckSum );
241 }
242
243 //------------------------------------------------------------------------
244 // Get the check sum at destination
245 //------------------------------------------------------------------------
246 timeval tStart, tEnd;
247
248 if( checkSumMode == "end2end" || checkSumMode == "target" )
249 {
250 gettimeofday( &tStart, 0 );
251 st = Utils::GetRemoteCheckSum( targetCheckSum, checkSumType, realTarget );
252
253 gettimeofday( &tEnd, 0 );
254 if( !st.IsOK() )
255 return UpdateErrMsg( st, "destination" );
256 pResults->Set( "targetCheckSum", targetCheckSum );
257 }
258
259 //------------------------------------------------------------------------
260 // Make sure the checksums are both lower case
261 //------------------------------------------------------------------------
262 auto sanitize_cksum = []( char c )
263 {
264 std::locale loc;
265 if( std::isalpha( c ) ) return std::tolower( c, loc );
266 return c;
267 };
268
269 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
270 sourceCheckSum.begin(), sanitize_cksum );
271
272 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
273 targetCheckSum.begin(), sanitize_cksum );
274
275 //------------------------------------------------------------------------
276 // Compare and inform monitoring
277 //------------------------------------------------------------------------
278 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
279 {
280 bool match = false;
281 if( sourceCheckSum == targetCheckSum )
282 match = true;
283
285 if( mon )
286 {
290 i.cksum = sourceCheckSum;
291 i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
292 i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
293 i.isOK = match;
294 mon->Event( Monitor::EvCheckSum, &i );
295 }
296
297 if( !match )
299
300 log->Info(UtilityMsg, "Checksum verification: succeeded." );
301 }
302 }
303
304 return XRootDStatus();
305 }
306
307 //----------------------------------------------------------------------------
308 // Check whether doing a third party copy is feasible for given
309 // job descriptor
310 //----------------------------------------------------------------------------
311 XRootDStatus ThirdPartyCopyJob::CanDo()
312 {
313 const URL &source = GetSource();
314 const URL &target = GetTarget();
315
316 //--------------------------------------------------------------------------
317 // We can only do a TPC if both source and destination are remote files
318 //--------------------------------------------------------------------------
319 if( source.IsLocalFile() || target.IsLocalFile() )
321 "Cannot do a third-party-copy for local file." );
322
323 //--------------------------------------------------------------------------
324 // Check the initial settings
325 //--------------------------------------------------------------------------
326 Log *log = DefaultEnv::GetLog();
327 log->Debug( UtilityMsg, "Check if third party copy between %s and %s "
328 "is possible", source.GetObfuscatedURL().c_str(),
329 target.GetObfuscatedURL().c_str() );
330
331 if( target.GetProtocol() != "root" &&
332 target.GetProtocol() != "xroot" &&
333 target.GetProtocol() != "roots" &&
334 target.GetProtocol() != "xroots" )
335 return XRootDStatus( stError, errNotSupported, 0, "Third-party-copy "
336 "is only supported for root/xroot protocol." );
337
338 pProperties->Get( "initTimeout", initTimeout );
339 InitTimeoutCalc timeLeft( initTimeout );
340
341 pProperties->Get( "checkSumMode", checkSumMode );
342 pProperties->Get( "checkSumType", checkSumType );
343 pProperties->Get( "checkSumPreset", checkSumPreset );
344 pProperties->Get( "force", force );
345 pProperties->Get( "coerce", coerce );
346 pProperties->Get( "delegate", delegate );
347
349 env->GetInt( "SubStreamsPerChannel", nbStrm );
350
351 // account for the control stream
352 if (nbStrm > 0) --nbStrm;
353
354 bool tpcLiteOnly = false;
355
356 if( !delegate )
357 log->Info( UtilityMsg, "We are NOT using delegation" );
358
359 //--------------------------------------------------------------------------
360 // Resolve the 'auto' checksum type.
361 //--------------------------------------------------------------------------
362 if( checkSumType == "auto" )
363 {
364 checkSumType = Utils::InferChecksumType( GetSource(), GetTarget() );
365 if( checkSumType.empty() )
366 log->Info( UtilityMsg, "Could not infer checksum type." );
367 else
368 log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
369 }
370
371 //--------------------------------------------------------------------------
372 // Check if we can open the source. Note in TPC-lite scenario it is optional
373 // for this step to be successful.
374 //--------------------------------------------------------------------------
375 File sourceFile;
376 XRootDStatus st;
377 URL sourceURL = source;
378 URL::ParamsMap params;
379
380 // set WriteRecovery property
381 std::string value;
382 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
383 sourceFile.SetProperty( "ReadRecovery", value );
384
385 // save the original opaque parameter list as specified by the user for later
386 const URL::ParamsMap &srcparams = sourceURL.GetParams();
387
388 //--------------------------------------------------------------------------
389 // Do the facultative step at source only if the protocol is root/xroot,
390 // otherwise don't bother
391 //--------------------------------------------------------------------------
392 if( sourceURL.GetProtocol() == "root" || sourceURL.GetProtocol() == "xroot" ||
393 sourceURL.GetProtocol() == "roots" || sourceURL.GetProtocol() == "xroots" )
394 {
395 params = sourceURL.GetParams();
396 params["tpc.stage"] = "placement";
397 sourceURL.SetParams( params );
398 log->Debug( UtilityMsg, "Trying to open %s for reading",
399 sourceURL.GetObfuscatedURL().c_str() );
400 st = sourceFile.Open( sourceURL.GetURL(), OpenFlags::Read, Access::None,
401 timeLeft );
402 }
403 else
404 st = XRootDStatus( stError, errNotSupported );
405
406 if( st.IsOK() )
407 {
408 std::string sourceUrl;
409 sourceFile.GetProperty( "LastURL", sourceUrl );
410 tpcSource = sourceUrl;
411
412 VirtualRedirector *redirector = 0;
413 long long size = -1;
414 if( source.IsMetalink() &&
415 ( redirector = RedirectorRegistry::Instance().Get( tpcSource ) ) &&
416 ( size = redirector->GetSize() ) >= 0 )
417 sourceSize = size;
418 else
419 {
420 StatInfo *statInfo;
421 st = sourceFile.Stat( false, statInfo );
422 if (st.IsOK()) sourceSize = statInfo->GetSize();
423 delete statInfo;
424 }
425 }
426 else
427 {
428 log->Info( UtilityMsg, "Cannot open source file %s: %s",
429 source.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
430 if( !delegate )
431 {
432 //----------------------------------------------------------------------
433 // If we cannot contact the source and there is no credential to delegate
434 // it cannot possibly work
435 //----------------------------------------------------------------------
436 st.status = stFatal;
437 return st;
438 }
439
440 tpcSource = sourceURL;
441 tpcLiteOnly = true;
442 }
443
444 // get the opaque parameters as returned by the redirector
445 URL tpcSourceUrl = tpcSource;
446 URL::ParamsMap tpcsrcparams = tpcSourceUrl.GetParams();
447 // merge the original cgi with the one returned by the redirector,
448 // the original values take precedence
449 URL::ParamsMap::const_iterator itr = srcparams.begin();
450 for( ; itr != srcparams.end(); ++itr )
451 tpcsrcparams[itr->first] = itr->second;
452 tpcSourceUrl.SetParams( tpcsrcparams );
453 // save the merged opaque parameter list for later
454 std::string scgi;
455 const URL::ParamsMap &scgiparams = tpcSourceUrl.GetParams();
456 itr = scgiparams.begin();
457 for( ; itr != scgiparams.end(); ++itr )
458 if( itr->first.compare( 0, 6, "xrdcl." ) != 0 )
459 {
460 if( !scgi.empty() ) scgi += '\t';
461 scgi += itr->first + '=' + itr->second;
462 }
463
464 if( !timeLeft().IsOK() )
465 {
466 // we still want to send a close, but we time it out quickly
467 st = sourceFile.Close( 1 );
468 return XRootDStatus( stError, errOperationExpired );
469 }
470
471 st = sourceFile.Close( timeLeft );
472
473 if( !timeLeft().IsOK() )
474 return XRootDStatus( stError, errOperationExpired );
475
476 //--------------------------------------------------------------------------
477 // Now we need to check the destination !!!
478 //--------------------------------------------------------------------------
479 if( delegate )
481 else
483
484 //--------------------------------------------------------------------------
485 // Generate the destination CGI
486 //--------------------------------------------------------------------------
487 log->Debug( UtilityMsg, "Generating the destination TPC URL" );
488
489 tpcKey = GenerateKey();
490
491 char *cgiBuff = new char[2048];
492 const char *cgiP = XrdOucTPC::cgiC2Dst( tpcKey.c_str(),
493 tpcSource.GetHostId().c_str(),
494 tpcSource.GetPath().c_str(),
495 0, cgiBuff, 2048, nbStrm,
496 GetSource().GetHostId().c_str(),
497 GetSource().GetProtocol().c_str(),
498 GetTarget().GetProtocol().c_str(),
499 delegate );
500
501 if( *cgiP == '!' )
502 {
503 log->Error( UtilityMsg, "Unable to setup target url: %s", cgiP+1 );
504 delete [] cgiBuff;
505 return XRootDStatus( stError, errNotSupported );
506 }
507
508 URL cgiURL; cgiURL.SetParams( cgiBuff );
509 delete [] cgiBuff;
510
511 realTarget = GetTarget();
512 params = realTarget.GetParams();
513 MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
514
515 if( !tpcLiteOnly ) // we only append oss.asize if it source file size is actually known
516 {
517 std::ostringstream o; o << sourceSize;
518 params["oss.asize"] = o.str();
519 }
520 params["tpc.stage"] = "copy";
521
522 // forward source cgi info to the destination in case we are going to do delegation
523 if( !scgi.empty() && delegate )
524 params["tpc.scgi"] = scgi;
525
526 realTarget.SetParams( params );
527
528 log->Debug( UtilityMsg, "Target url is: %s", realTarget.GetObfuscatedURL().c_str() );
529
530 //--------------------------------------------------------------------------
531 // Open the target file
532 //--------------------------------------------------------------------------
533 // set WriteRecovery property
534 DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
535 dstFile.SetProperty( "WriteRecovery", value );
536
538 if( force )
539 targetFlags |= OpenFlags::Delete;
540 else
541 targetFlags |= OpenFlags::New;
542
543 if( coerce )
544 targetFlags |= OpenFlags::Force;
545
547 st = dstFile.Open( realTarget.GetURL(), targetFlags, mode, timeLeft );
548 if( !st.IsOK() )
549 {
550 log->Error( UtilityMsg, "Unable to open target %s: %s",
551 realTarget.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
552 if( st.code == errErrorResponse &&
553 st.errNo == kXR_FSError &&
554 st.GetErrorMessage().find( "tpc not supported" ) != std::string::npos )
555 return XRootDStatus( stError, errNotSupported, 0, // the open failed due to lack of TPC support on the server side
556 "Destination does not support third-party-copy." );
557 return UpdateErrMsg( st, "destination" );
558 }
559
560 std::string lastUrl; dstFile.GetProperty( "LastURL", lastUrl );
561 realTarget = lastUrl;
562
563 if( !timeLeft().IsOK() )
564 {
565 // we still want to send a close, but we time it out fast
566 st = dstFile.Close( 1 );
567 return XRootDStatus( stError, errOperationExpired );
568 }
569
570 //--------------------------------------------------------------------------
571 // Verify if the destination supports TPC / TPC-lite
572 //--------------------------------------------------------------------------
573 st = Utils::CheckTPCLite( realTarget.GetURL() );
574 if( !st.IsOK() )
575 {
576 // we still want to send a close, but we time it out fast
577 st = dstFile.Close( 1 );
578 return XRootDStatus( stError, errNotSupported, 0, // doesn't support TPC
579 "Destination does not support third-party-copy.");
580 }
581
582 //--------------------------------------------------------------------------
583 // if target supports TPC-lite and we have a credential to delegate we can
584 // go ahead and use TPC-lite
585 //--------------------------------------------------------------------------
586 tpcLite = ( st.code != suPartial ) && delegate;
587
588 if( !tpcLite && tpcLiteOnly ) // doesn't support TPC-lite and it was our only hope
589 {
590 st = dstFile.Close( 1 );
591 return XRootDStatus( stError, errNotSupported, 0, "Destination does not "
592 "support delegation." );
593 }
594
595 //--------------------------------------------------------------------------
596 // adjust the InitTimeout
597 //--------------------------------------------------------------------------
598 if( !timeLeft().IsOK() )
599 {
600 // we still want to send a close, but we time it out fast
601 st = dstFile.Close( 1 );
602 return XRootDStatus( stError, errOperationExpired );
603 }
604
605 //--------------------------------------------------------------------------
606 // If we don't use delegation the source has to support TPC
607 //--------------------------------------------------------------------------
608 if( !tpcLite )
609 {
610 st = Utils::CheckTPC( URL( tpcSource ).GetURL(), timeLeft );
611 if( !st.IsOK() )
612 {
613 log->Error( UtilityMsg, "Source (%s) does not support TPC",
614 tpcSource.GetURL().c_str() );
615 return XRootDStatus( stError, errNotSupported, 0, "Source does not "
616 "support third-party-copy" );
617 }
618
619 if( !timeLeft().IsOK() )
620 {
621 // we still want to send a close, but we time it out quickly
622 st = sourceFile.Close( 1 );
623 return XRootDStatus( stError, errOperationExpired );
624 }
625 }
626
627 initTimeout = uint16_t( timeLeft );
628
629 return XRootDStatus();
630 }
631
632 //----------------------------------------------------------------------------
633 // Run vanilla copy job
634 //----------------------------------------------------------------------------
635 XRootDStatus ThirdPartyCopyJob::RunTPC( CopyProgressHandler *progress )
636 {
637 Log *log = DefaultEnv::GetLog();
638
639 //--------------------------------------------------------------------------
640 // Generate the source CGI
641 //--------------------------------------------------------------------------
642 char *cgiBuff = new char[2048];
643 const char *cgiP = XrdOucTPC::cgiC2Src( tpcKey.c_str(),
644 realTarget.GetHostName().c_str(), -1, cgiBuff,
645 2048 );
646 if( *cgiP == '!' )
647 {
648 log->Error( UtilityMsg, "Unable to setup source url: %s", cgiP+1 );
649 delete [] cgiBuff;
650 return XRootDStatus( stError, errInvalidArgs );
651 }
652
653 URL cgiURL; cgiURL.SetParams( cgiBuff );
654 delete [] cgiBuff;
655 URL::ParamsMap params = tpcSource.GetParams();
656 MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
657 params["tpc.stage"] = "copy";
658 tpcSource.SetParams( params );
659
660 log->Debug( UtilityMsg, "Source url is: %s", tpcSource.GetObfuscatedURL().c_str() );
661
662 // Set the close timeout to the default value of the stream timeout
663 int closeTimeout = 0;
664 (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
665
666 //--------------------------------------------------------------------------
667 // Set up the rendez-vous and open the source
668 //--------------------------------------------------------------------------
669 InitTimeoutCalc timeLeft( initTimeout );
670 XRootDStatus st = dstFile.Sync( timeLeft );
671 if( !st.IsOK() )
672 {
673 log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
674 st.ToStr().c_str() );
675 XRootDStatus status = dstFile.Close( closeTimeout );
676 return UpdateErrMsg( st, "destination" );
677 }
678
679 //--------------------------------------------------------------------------
680 // Calculate the time we have left to perform source open
681 //--------------------------------------------------------------------------
682 if( !timeLeft().IsOK() )
683 {
684 XRootDStatus status = dstFile.Close( closeTimeout );
685 return XRootDStatus( stError, errOperationExpired );
686 }
687
688 File sourceFile( File::DisableVirtRedirect );
689 // set ReadRecovery property
690 std::string value;
691 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
692 sourceFile.SetProperty( "ReadRecovery", value );
693
694 st = sourceFile.Open( tpcSource.GetURL(), OpenFlags::Read, Access::None,
695 timeLeft );
696
697 if( !st.IsOK() )
698 {
699 log->Error( UtilityMsg, "Unable to open source %s: %s",
700 tpcSource.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
701 XRootDStatus status = dstFile.Close( closeTimeout );
702 return UpdateErrMsg( st, "source" );
703 }
704
705 //--------------------------------------------------------------------------
706 // Do the copy and follow progress
707 //--------------------------------------------------------------------------
708 TPCStatusHandler statusHandler;
709 XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
710 StatInfo *info = 0;
711
712 uint16_t tpcTimeout = 0;
713 pProperties->Get( "tpcTimeout", tpcTimeout );
714
715 st = dstFile.Sync( &statusHandler, tpcTimeout );
716 if( !st.IsOK() )
717 {
718 log->Error( UtilityMsg, "Unable start the copy: %s",
719 st.ToStr().c_str() );
720 XRootDStatus statusS = sourceFile.Close( closeTimeout );
721 XRootDStatus statusT = dstFile.Close( closeTimeout );
722 return UpdateErrMsg( st, "destination" );
723 }
724
725 //--------------------------------------------------------------------------
726 // Stat the file every second until sync returns
727 //--------------------------------------------------------------------------
728 bool canceled = false;
729 while( 1 )
730 {
731 XrdSysTimer::Wait( 2500 );
732
733 if( progress )
734 {
735 st = dstFile.Stat( true, info );
736 if( st.IsOK() )
737 {
738 progress->JobProgress( pJobId, info->GetSize(), sourceSize );
739 delete info;
740 info = 0;
741 }
742 bool shouldCancel = progress->ShouldCancel( pJobId );
743 if( shouldCancel )
744 {
745 log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
746 Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
747 XRootDStatus st = dstFile.Fcntl( arg, response );
748 if( !st.IsOK() )
749 log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
750 st.ToStr().c_str() );
751
752 delete response;
753 canceled = true;
754 break;
755 }
756 }
757
758 if( sem->CondWait() )
759 break;
760 }
761
762 //--------------------------------------------------------------------------
763 // Sync has returned so we can check if it was successful
764 //--------------------------------------------------------------------------
765 if( canceled )
766 sem->Wait();
767
768 st = *statusHandler.GetStatus();
769
770 if( !st.IsOK() )
771 {
772 log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
773 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
774 st.ToStr().c_str() );
775
776 // Ignore close response
777 XRootDStatus statusS = sourceFile.Close( closeTimeout );
778 XRootDStatus statusT = dstFile.Close( closeTimeout );
779 return st;
780 }
781
782 XRootDStatus statusS = sourceFile.Close( closeTimeout );
783 XRootDStatus statusT = dstFile.Close( closeTimeout );
784
785 if ( !statusS.IsOK() || !statusT.IsOK() )
786 {
787 st = (statusS.IsOK() ? statusT : statusS);
788 log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
789 "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
790 GetTarget().GetObfuscatedURL().c_str(),
791 (statusS.IsOK() ? "destination" : "source"), st.ToStr().c_str() );
792 return UpdateErrMsg( st, statusS.IsOK() ? "source" : "destination" );
793 }
794
795 log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
796 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
797
798 pResults->Set( "size", sourceSize );
799
800 return XRootDStatus();
801 }
802
803 XRootDStatus ThirdPartyCopyJob::RunLite( CopyProgressHandler *progress )
804 {
805 Log *log = DefaultEnv::GetLog();
806
807 // Set the close timeout to the default value of the stream timeout
808 int closeTimeout = 0;
809 (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
810
811 //--------------------------------------------------------------------------
812 // Set up the rendez-vous
813 //--------------------------------------------------------------------------
814 InitTimeoutCalc timeLeft( initTimeout );
815 XRootDStatus st = dstFile.Sync( timeLeft );
816 if( !st.IsOK() )
817 {
818 log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
819 st.ToStr().c_str() );
820 XRootDStatus status = dstFile.Close( closeTimeout );
821 return UpdateErrMsg( st, "destination" );
822 }
823
824 //--------------------------------------------------------------------------
825 // Do the copy and follow progress
826 //--------------------------------------------------------------------------
827 TPCStatusHandler statusHandler;
828 XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
829 StatInfo *info = 0;
830
831 uint16_t tpcTimeout = 0;
832 pProperties->Get( "tpcTimeout", tpcTimeout );
833
834 st = dstFile.Sync( &statusHandler, tpcTimeout );
835 if( !st.IsOK() )
836 {
837 log->Error( UtilityMsg, "Unable start the copy: %s",
838 st.ToStr().c_str() );
839 XRootDStatus statusT = dstFile.Close( closeTimeout );
840 return UpdateErrMsg( st, "destination" );
841 }
842
843 //--------------------------------------------------------------------------
844 // Stat the file every second until sync returns
845 //--------------------------------------------------------------------------
846 bool canceled = false;
847 while( 1 )
848 {
849 XrdSysTimer::Wait( 2500 );
850
851 if( progress )
852 {
853 st = dstFile.Stat( true, info );
854 if( st.IsOK() )
855 {
856 progress->JobProgress( pJobId, info->GetSize(), sourceSize );
857 delete info;
858 info = 0;
859 }
860 bool shouldCancel = progress->ShouldCancel( pJobId );
861 if( shouldCancel )
862 {
863 log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
864 Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
865 XRootDStatus st = dstFile.Fcntl( arg, response );
866 if( !st.IsOK() )
867 log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
868 st.ToStr().c_str() );
869
870 delete response;
871 canceled = true;
872 break;
873 }
874 }
875
876 if( sem->CondWait() )
877 break;
878 }
879
880 //--------------------------------------------------------------------------
881 // Sync has returned so we can check if it was successful
882 //--------------------------------------------------------------------------
883 if( canceled )
884 sem->Wait();
885
886 st = *statusHandler.GetStatus();
887
888 if( !st.IsOK() )
889 {
890 log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
891 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
892 st.ToStr().c_str() );
893
894 // Ignore close response
895 XRootDStatus statusT = dstFile.Close( closeTimeout );
896 return st;
897 }
898
899 st = dstFile.Close( closeTimeout );
900
901 if ( !st.IsOK() )
902 {
903 log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
904 "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
905 GetTarget().GetObfuscatedURL().c_str(),
906 "destination", st.ToStr().c_str() );
907 return UpdateErrMsg( st, "destination" );
908 }
909
910 log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
911 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
912
913 pResults->Set( "size", sourceSize );
914
915 return XRootDStatus();
916 }
917
918
919 //----------------------------------------------------------------------------
920 // Generate a rendez-vous key
921 //----------------------------------------------------------------------------
922 std::string ThirdPartyCopyJob::GenerateKey()
923 {
924 static const int _10to9 = 1000000000;
925
926 char tpcKey[25];
927
928 auto tp = std::chrono::high_resolution_clock::now();
929 auto d = tp.time_since_epoch();
930 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>( d );
931 auto s = std::chrono::duration_cast<std::chrono::seconds>( d );
932 uint32_t k1 = ns.count() - s.count() * _10to9;
933 uint32_t k2 = getpid() | (getppid() << 16);
934 uint32_t k3 = s.count();
935 snprintf( tpcKey, 25, "%08x%08x%08x", k1, k2, k3 );
936 return std::string(tpcKey);
937 }
938}
@ kXR_FSError
Definition XProtocol.hh:995
XrdOucString File
PropertyList * pResults
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
void Enable()
Enable delegation in the environment.
static DlgEnv & Instance()
void Disable()
Disable delegation in the environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A file.
Definition XrdClFile.hh:46
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:151
@ DisableVirtRedirect
Definition XrdClFile.hh:52
XRootDStatus Fcntl(const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:610
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:99
bool GetProperty(const std::string &name, std::string &value) const
Definition XrdClFile.cc:878
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:177
bool SetProperty(const std::string &name, const std::string &value)
Definition XrdClFile.cc:867
XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:414
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
ThirdPartyCopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:458
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:395
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition XrdClURL.cc:491
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool IsLocalFile() const
Definition XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
static XRootDStatus CheckTPCLite(const std::string &server, uint16_t timeout=0)
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static XRootDStatus CheckTPC(const std::string &server, uint16_t timeout=0)
Check if peer supports tpc.
An interface for metadata redirectors.
virtual std::string GetCheckSum(const std::string &type) const =0
const std::string & GetErrorMessage() const
Get error message.
void SetErrorMessage(const std::string &message)
Set the error message.
static const char * cgiC2Dst(const char *cKey, const char *xSrc, const char *xLfn, const char *xCks, char *Buff, int Blen, int strms=0, const char *iHst=0, const char *sprt=0, const char *tprt=0, bool dlgon=false, bool push=false)
Definition XrdOucTPC.cc:62
static const char * cgiC2Src(const char *cKey, const char *xDst, int xTTL, char *Buff, int Blen)
Definition XrdOucTPC.cc:136
static void Wait(int milliseconds)
const uint16_t suPartial
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
const uint16_t errNotSupported
const uint16_t errCheckSumError
XrdSysError Log
Definition XrdConfig.cc:112
@ UR
owner readable
@ GR
group readable
@ UW
owner writable
@ OR
world readable
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.