Source code for pilot.copytool.xrdcp

#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021

# Reimplemented by Alexey Anisenkov

import os
import logging
import re
from time import time

from .common import resolve_common_transfer_errors, verify_catalog_checksum  #, get_timeout
from pilot.util.container import execute
from pilot.common.exception import PilotException, ErrorCodes
#from pilot.util.timer import timeout

logger = logging.getLogger(__name__)

require_replicas = True  ## indicate if given copytool requires input replicas to be resolved
allowed_schemas = ['root']  # prioritized list of supported schemas for transfers by given copytool

copy_command = 'xrdcp'


[docs] def is_valid_for_copy_in(files): return True ## FIX ME LATER
[docs] def is_valid_for_copy_out(files): return True ## FIX ME LATER
[docs] def _resolve_checksum_option(setup, **kwargs): cmd = "%s --version" % copy_command if setup: cmd = "source %s; %s" % (setup, cmd) logger.info("Execute command (%s) to check xrdcp client version", cmd) rcode, stdout, stderr = execute(cmd, **kwargs) logger.info("return code: %s", rcode) logger.info("return output: %s", stdout + stderr) cmd = "%s -h" % copy_command if setup: cmd = "source %s; %s" % (setup, cmd) logger.info("Execute command (%s) to decide which option should be used to calc/verify file checksum..", cmd) rcode, stdout, stderr = execute(cmd, **kwargs) output = stdout + stderr logger.info("return code: %s", rcode) logger.debug("return output: %s", output) coption = "" checksum_type = 'adler32' ## consider only adler32 for now if rcode: logger.error('FAILED to execute command=%s: %s', cmd, output) else: if "--cksum" in output: coption = "--cksum %s:print" % checksum_type elif "-adler" in output and checksum_type == 'adler32': coption = "-adler" elif "-md5" in output and checksum_type == 'md5': coption = "-md5" if coption: logger.info("Use %s option to get the checksum for %s command", coption, copy_command) return coption
#@timeout(seconds=10800)
[docs] def _stagefile(coption, source, destination, filesize, is_stagein, setup=None, **kwargs): """ Stage the file (stagein or stageout) :return: destination file details (checksum, checksum_type) in case of success, throw exception in case of failure :raise: PilotException in case of controlled error """ filesize_cmd, checksum_cmd, checksum_type = None, None, None cmd = '%s -np -f %s %s %s' % (copy_command, coption, source, destination) if setup: cmd = "source %s; %s" % (setup, cmd) #timeout = get_timeout(filesize) #logger.info("Executing command: %s, timeout=%s" % (cmd, timeout)) rcode, stdout, stderr = execute(cmd, **kwargs) logger.info('rcode=%d, stdout=%s, stderr=%s', rcode, stdout, stderr) if rcode: ## error occurred error = resolve_common_transfer_errors(stdout + stderr, is_stagein=is_stagein) #rcode = error.get('rcode') ## TO BE IMPLEMENTED #if not is_stagein and rcode == PilotErrors.ERR_CHKSUMNOTSUP: ## stage-out, on fly checksum verification is not supported .. ignore # logger.info('stage-out: ignore ERR_CHKSUMNOTSUP error .. will explicitly verify uploaded file') # return None, None raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state')) # extract filesize and checksum values from output if coption != "": filesize_cmd, checksum_cmd, checksum_type = get_file_info_from_output(stdout + stderr) ## verify transfer by returned checksum or call remote checksum calculation ## to be moved at the base level is_verified = True ## TO BE IMPLEMENTED LATER if not is_verified: rcode = ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH raise PilotException("Copy command failed", code=rcode, state='AD_MISMATCH') return filesize_cmd, checksum_cmd, checksum_type
# @timeout(seconds=10800)
[docs] def copy_in(files, **kwargs): """ Download given files using xrdcp command. :param files: list of `FileSpec` objects :raise: PilotException in case of controlled error """ #allow_direct_access = kwargs.get('allow_direct_access') or False setup = kwargs.pop('copytools', {}).get('xrdcp', {}).get('setup') coption = _resolve_checksum_option(setup, **kwargs) trace_report = kwargs.get('trace_report') # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite')) for fspec in files: # update the trace report localsite = localsite if localsite else fspec.ddmendpoint trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize) trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', '')) trace_report.update(scope=fspec.scope, dataset=fspec.dataset) # continue loop for files that are to be accessed directly ## TOBE DEPRECATED (anisyonk) #if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct': # fspec.status_code = 0 # fspec.status = 'remote_io' # trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access') # trace_report.send() # continue trace_report.update(catStart=time()) dst = fspec.workdir or kwargs.get('workdir') or '.' destination = os.path.join(dst, fspec.lfn) try: filesize_cmd, checksum_cmd, checksum_type = _stagefile(coption, fspec.turl, destination, fspec.filesize, is_stagein=True, setup=setup, **kwargs) fspec.status_code = 0 fspec.status = 'transferred' except PilotException as error: fspec.status = 'failed' fspec.status_code = error.get_error_code() diagnostics = error.get_detail() state = 'STAGEIN_ATTEMPT_FAILED' trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) else: # compare checksums fspec.checksum[checksum_type] = checksum_cmd # remote checksum state, diagnostics = verify_catalog_checksum(fspec, destination) if diagnostics != "": trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) trace_report.send() return files
# @timeout(seconds=10800)
[docs] def copy_out(files, **kwargs): """ Upload given files using xrdcp command. :param files: list of `FileSpec` objects :raise: PilotException in case of controlled error """ setup = kwargs.pop('copytools', {}).get('xrdcp', {}).get('setup') coption = _resolve_checksum_option(setup, **kwargs) trace_report = kwargs.get('trace_report') for fspec in files: trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize) trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', '')) try: filesize_cmd, checksum_cmd, checksum_type = _stagefile(coption, fspec.surl, fspec.turl, fspec.filesize, is_stagein=False, setup=setup, **kwargs) fspec.status_code = 0 fspec.status = 'transferred' trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) trace_report.send() except PilotException as error: fspec.status = 'failed' fspec.status_code = error.get_error_code() state = 'STAGEOUT_ATTEMPT_FAILED' diagnostics = error.get_detail() trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) else: # compare checksums fspec.checksum[checksum_type] = checksum_cmd # remote checksum state, diagnostics = verify_catalog_checksum(fspec, fspec.surl) if diagnostics != "": trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) return files
[docs] def get_file_info_from_output(output): """ Extract file size, checksum value from xrdcp --chksum command output :return: (filesize [int/None], checksum, checksum_type) or (None, None, None) in case of failure """ if not output: return None, None, None if not ("xrootd" in output or "XRootD" in output or "adler32" in output): logger.warning("WARNING: Failed to extract checksum: Unexpected output: %s", output) return None, None, None pattern = r"(?P<type>md5|adler32):\ (?P<checksum>[a-zA-Z0-9]+)\ \S+\ (?P<filesize>[0-9]+)" # Python 3 (added r) filesize, checksum, checksum_type = None, None, None m = re.search(pattern, output) if m: checksum_type = m.group('type') checksum = m.group('checksum') checksum = checksum.zfill(8) # make it 8 chars length (adler32 xrdcp fix) filesize = m.group('filesize') if filesize: try: filesize = int(filesize) except ValueError as error: logger.warning('failed to convert filesize to int: %s', error) filesize = None else: logger.warning("WARNING: Checksum/file size info not found in output: failed to match pattern=%s in output=%s", pattern, output) return filesize, checksum, checksum_type