Source code for pilot.copytool.common

#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Tobias Wegner, tobias.wegner@cern.ch, 2017
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
# - Mario Lassnig, mario.lassnig@cern.ch, 2020

import logging
import os
import re

from pilot.common.errorcodes import ErrorCodes
from pilot.util.filehandling import calculate_checksum, get_checksum_type, get_checksum_value

logger = logging.getLogger(__name__)


[docs] def get_timeout(filesize, add=0): """ Get a proper time-out limit based on the file size. :param filesize: file size (int). :param add: optional additional time to be added [s] (int) :return: time-out in seconds (int). """ timeout_max = 3 * 3600 # 3 hours timeout_min = 300 # self.timeout timeout = timeout_min + int(filesize / 0.1e7) + add # approx < 1 Mb/sec return min(timeout, timeout_max)
[docs] def verify_catalog_checksum(fspec, path): """ Verify that the local and remote (fspec) checksum values are the same. The function will update the fspec object. :param fspec: FileSpec object for a given file. :param path: path to local file (string). :return: state (string), diagnostics (string). """ diagnostics = "" state = "" checksum_type = get_checksum_type(fspec.checksum) checksum_catalog = get_checksum_value(fspec.checksum) if checksum_type == 'unknown': diagnostics = 'unknown checksum type for checksum(catalog): %s' % fspec.checksum logger.warning(diagnostics) fspec.status_code = ErrorCodes.UNKNOWNCHECKSUMTYPE fspec.status = 'failed' state = 'UNKNOWN_CHECKSUM_TYPE' else: checksum_local = calculate_checksum(path, algorithm=checksum_type) if checksum_type == 'ad32': checksum_type = 'adler32' logger.info('checksum (catalog): %s (type: %s)', checksum_catalog, checksum_type) logger.info('checksum (local): %s', checksum_local) if checksum_local and checksum_local != '' and checksum_local != checksum_catalog: diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \ (fspec.lfn, checksum_catalog, checksum_local) logger.warning(diagnostics) fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH fspec.status = 'failed' state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH' else: logger.info('catalog and local checksum values are the same') return state, diagnostics
[docs] def merge_destinations(files): """ Converts the file-with-destination dict to a destination-with-files dict :param files Files to merge :returns destination-with-files dictionary """ destinations = {} # ensure type(files) == list for f in files: # ensure destination in f if not os.path.exists(f['destination']): f['status'] = 'failed' f['errmsg'] = 'Destination directory does not exist: %s' % f['destination'] f['errno'] = 1 else: # ensure scope, name in f f['status'] = 'running' f['errmsg'] = 'File not yet successfully downloaded.' f['errno'] = 2 lfn = '%s:%s' % (f['scope'], f['name']) dst = destinations.setdefault(f['destination'], {'lfns': set(), 'files': list()}) dst['lfns'].add(lfn) dst['files'].append(f) return destinations
[docs] def get_copysetup(copytools, copytool_name): """ Return the copysetup for the given copytool. :param copytools: copytools list from infosys. :param copytool name: name of copytool (string). :return: copysetup (string). """ copysetup = "" if not copytools: return "" for ct in list(copytools.keys()): # Python 2/3 if copytool_name == ct: copysetup = copytools[ct].get('setup') break return copysetup
[docs] def get_error_info(rcode, state, error_msg): """ Return an error info dictionary specific to transfer errors. Helper function to resolve_common_transfer_errors(). :param rcode: return code (int). :param state: state string used in Rucio traces. :param error_msg: transfer command stdout (string). :return: dictionary with format {'rcode': rcode, 'state': state, 'error': error_msg}. """ return {'rcode': rcode, 'state': state, 'error': error_msg}
[docs] def output_line_scan(ret, output): """ Do some reg exp on the transfer command output to search for special errors. Helper function to resolve_common_transfer_errors(). :param ret: pre-filled error info dictionary with format {'rcode': rcode, 'state': state, 'error': error_msg} :param output: transfer command stdout (string). :return: updated error info dictionary. """ for line in output.split('\n'): m = re.search(r"[Dd]etails\s*:\s*(?P<error>.*)", line) # Python 3 (added r) if m: ret['error'] = m.group('error') elif 'service_unavailable' in line: ret['error'] = 'service_unavailable' ret['rcode'] = ErrorCodes.RUCIOSERVICEUNAVAILABLE return ret
[docs] def resolve_common_transfer_errors(output, is_stagein=True): # noqa: C901 """ Resolve any common transfer related errors. :param output: stdout from transfer command (string). :param is_stagein: optional (boolean). :return: dict {'rcode': rcode, 'state': state, 'error': error_msg}. """ # default to make sure dictionary exists and all fields are populated (some of which might be overwritten below) ret = get_error_info(ErrorCodes.STAGEINFAILED if is_stagein else ErrorCodes.STAGEOUTFAILED, 'COPY_ERROR', output) if not output: return ret if "timeout" in output: ret = get_error_info(ErrorCodes.STAGEINTIMEOUT if is_stagein else ErrorCodes.STAGEOUTTIMEOUT, 'CP_TIMEOUT', 'copy command timed out: %s' % output) elif "failed xrdadler32" in output: ret = get_error_info(ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH, 'AD_MISMATCH', output) elif "does not match the checksum" in output and 'adler32' in output: ret = get_error_info(ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH, 'AD_MISMATCH', output) elif "does not match the checksum" in output and 'adler32' not in output: ret = get_error_info(ErrorCodes.GETMD5MISMATCH if is_stagein else ErrorCodes.PUTMD5MISMATCH, 'MD5_MISMATCH', output) elif "globus_xio:" in output: ret = get_error_info(ErrorCodes.GETGLOBUSSYSERR if is_stagein else ErrorCodes.PUTGLOBUSSYSERR, 'GLOBUS_FAIL', "Globus system error: %s" % output) elif "File exists" in output or 'SRM_FILE_BUSY' in output or 'file already exists' in output: ret = get_error_info(ErrorCodes.FILEEXISTS, 'FILE_EXISTS', "File already exists in the destination: %s" % output) elif "No such file or directory" in output and is_stagein: ret = get_error_info(ErrorCodes.MISSINGINPUTFILE, 'MISSING_INPUT', output) elif "query chksum is not supported" in output or "Unable to checksum" in output: ret = get_error_info(ErrorCodes.CHKSUMNOTSUP, 'CHKSUM_NOTSUP', output) elif "Could not establish context" in output: error_msg = "Could not establish context: Proxy / VO extension of proxy has probably expired: %s" % output ret = get_error_info(ErrorCodes.NOPROXY, 'CONTEXT_FAIL', error_msg) elif "No space left on device" in output: ret = get_error_info(ErrorCodes.NOLOCALSPACE if is_stagein else ErrorCodes.NOREMOTESPACE, 'NO_SPACE', "No available space left on disk: %s" % output) elif "No such file or directory" in output: ret = get_error_info(ErrorCodes.NOSUCHFILE, 'NO_FILE', output) elif "service is not available at the moment" in output: ret = get_error_info(ErrorCodes.SERVICENOTAVAILABLE, 'SERVICE_ERROR', output) elif "Network is unreachable" in output: ret = get_error_info(ErrorCodes.UNREACHABLENETWORK, 'NETWORK_UNREACHABLE', output) elif "Run: [ERROR] Server responded with an error" in output: ret = get_error_info(ErrorCodes.XRDCPERROR, 'XRDCP_ERROR', output) elif "Unable to locate credentials" in output: ret = get_error_info(ErrorCodes.MISSINGCREDENTIALS, 'S3_ERROR', output) # reg exp the output to get real error message ret = output_line_scan(ret, output) return ret