Source code for pilot.copytool.rucio

#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018
# - Alexey Anisenkov, anisyonk@cern.ch, 2018
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
# - Tomas Javurek, tomas.javurek@cern.ch, 2019
# - David Cameron, david.cameron@cern.ch, 2019

from __future__ import absolute_import  # Python 2 (2to3 complains about this)

import os
import json
import logging
from time import time
from copy import deepcopy

from .common import resolve_common_transfer_errors, verify_catalog_checksum, get_timeout
from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes
from pilot.util.timer import timeout, TimedThread

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# can be disabled for Rucio if allowed to use all RSE for input
require_replicas = True    ## indicates if given copytool requires input replicas to be resolved
require_protocols = False  ## indicates if given copytool requires protocols to be resolved first for stage-out
tracing_rucio = False      ## should Rucio send the trace?


[docs] def is_valid_for_copy_in(files): return True ## FIX ME LATER
[docs] def is_valid_for_copy_out(files): return True ## FIX ME LATER
[docs] def verify_stage_out(fspec): """ Checks that the uploaded file is physically at the destination. :param fspec: file specifications """ from rucio.rse import rsemanager as rsemgr rse_settings = rsemgr.get_rse_info(fspec.ddmendpoint) uploaded_file = {'name': fspec.lfn, 'scope': fspec.scope} logger.info('Checking file: %s', str(fspec.lfn)) return rsemgr.exists(rse_settings, [uploaded_file])
#@timeout(seconds=10800)
[docs] def copy_in(files, **kwargs): """ Download given files using rucio copytool. :param files: list of `FileSpec` objects :param ignore_errors: boolean, if specified then transfer failures will be ignored :raise: PilotException in case of controlled error """ ignore_errors = kwargs.get('ignore_errors') trace_report = kwargs.get('trace_report') use_pcache = kwargs.get('use_pcache') #job = kwargs.get('job') # don't spoil the output, we depend on stderr parsing os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' logger.debug('RUCIO_LOCAL_SITE_ID=%s', os.environ.get('RUCIO_LOCAL_SITE_ID', '<unknown>')) logger.debug('trace_report[localSite]=%s', trace_report.get_value('localSite')) # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite')) for fspec in files: logger.info('rucio copytool, downloading file with scope:%s lfn:%s', str(fspec.scope), str(fspec.lfn)) # update the trace report localsite = localsite if localsite else fspec.ddmendpoint trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize) trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', '')) trace_report.update(scope=fspec.scope, dataset=fspec.dataset) trace_report.update(url=fspec.turl if fspec.turl else fspec.surl) trace_report.update(catStart=time()) ## is this metric still needed? LFC catalog fspec.status_code = 0 dst = fspec.workdir or kwargs.get('workdir') or '.' logger.info('the file will be stored in %s' % str(dst)) trace_report_out = [] transfer_timeout = get_timeout(fspec.filesize) ctimeout = transfer_timeout + 10 # give the API a chance to do the time-out first logger.info('overall transfer timeout=%s' % ctimeout) error_msg = "" ec = 0 try: ec, trace_report_out = timeout(ctimeout, timer=TimedThread)(_stage_in_api)(dst, fspec, trace_report, trace_report_out, transfer_timeout, use_pcache) #_stage_in_api(dst, fspec, trace_report, trace_report_out) except Exception as error: error_msg = str(error) error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=True) protocol = get_protocol(trace_report_out) trace_report.update(protocol=protocol) if not ignore_errors: trace_report.send() msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error')) raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) else: protocol = get_protocol(trace_report_out) trace_report.update(protocol=protocol) # make sure there was no missed failure (only way to deal with this until rucio API has been fixed) # (using the timeout decorator prevents the trace_report_out from being updated - rucio API should return # the proper error immediately instead of encoding it into a dictionary) state_reason = None if not trace_report_out else trace_report_out[0].get('stateReason') if ec and state_reason and not error_msg: error_details = handle_rucio_error(state_reason, trace_report, trace_report_out, fspec, stagein=True) if not ignore_errors: trace_report.send() msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error')) raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type destination = os.path.join(dst, fspec.lfn) if os.path.exists(destination): state, diagnostics = verify_catalog_checksum(fspec, destination) if diagnostics != "" and not ignore_errors: trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) else: diagnostics = 'file does not exist: %s (cannot verify catalog checksum)' % destination logger.warning(diagnostics) state = 'STAGEIN_ATTEMPT_FAILED' fspec.status_code = ErrorCodes.STAGEINFAILED trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) if not fspec.status_code: fspec.status_code = 0 fspec.status = 'transferred' trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) trace_report.send() return files
[docs] def get_protocol(trace_report_out): """ Extract the protocol used for the transfer from the dictionary returned by rucio. :param trace_report_out: returned rucio transfer dictionary (dictionary). :return: protocol (string). """ try: p = trace_report_out[0].get('protocol') except Exception as error: logger.warning('exception caught: %s' % error) p = '' return p
[docs] def handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=True): """ :param error_msg: :param trace_report: :param trace_report_out: :param fspec: :return: """ # try to get a better error message from the traces error_msg_org = error_msg if trace_report_out: logger.debug('reading stateReason from trace_report_out: %s' % trace_report_out) error_msg = trace_report_out[0].get('stateReason', '') if not error_msg or error_msg == 'OK': logger.warning('could not extract error message from trace report - reverting to original error message') error_msg = error_msg_org else: logger.debug('no trace_report_out') logger.info('rucio returned an error: \"%s\"' % error_msg) error_details = resolve_common_transfer_errors(error_msg, is_stagein=stagein) fspec.status = 'failed' fspec.status_code = error_details.get('rcode') msg = 'STAGEIN_ATTEMPT_FAILED' if stagein else 'STAGEOUT_ATTEMPT_FAILED' trace_report.update(clientState=error_details.get('state', msg), stateReason=error_details.get('error'), timeEnd=time()) return error_details
[docs] def copy_in_bulk(files, **kwargs): """ Download given files using rucio copytool. :param files: list of `FileSpec` objects :param ignore_errors: boolean, if specified then transfer failures will be ignored :raise: PilotException in case of controlled error """ #allow_direct_access = kwargs.get('allow_direct_access') ignore_errors = kwargs.get('ignore_errors') trace_common_fields = kwargs.get('trace_report') # don't spoil the output, we depend on stderr parsing os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' dst = kwargs.get('workdir') or '.' # THE DOWNLOAD trace_report_out = [] try: # transfer_timeout = get_timeout(fspec.filesize, add=10) # give the API a chance to do the time-out first # timeout(transfer_timeout)(_stage_in_api)(dst, fspec, trace_report, trace_report_out) _stage_in_bulk(dst, files, trace_report_out, trace_common_fields) except Exception as error: error_msg = str(error) # Fill and sned the traces, if they are not received from Rucio, abortion of the download process # If there was Exception from Rucio, but still some traces returned, we continue to VALIDATION section if not trace_report_out: trace_report = deepcopy(trace_common_fields) localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', None) diagnostics = 'None of the traces received from Rucio. Response from Rucio: %s' % error_msg for fspec in files: localsite = localsite if localsite else fspec.ddmendpoint trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize) trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', '')) trace_report.update(scope=fspec.scope, dataset=fspec.dataset) trace_report.update('STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) trace_report.send() logger.error(diagnostics) raise PilotException(diagnostics, code=fspec.status_code, state='STAGEIN_ATTEMPT_FAILED') # VALIDATION AND TERMINATION files_done = [] for fspec in files: # getting the trace for given file # if one trace is missing, the whould stagin gets failed trace_candidates = _get_trace(fspec, trace_report_out) protocol = get_protocol(trace_report_out) # note this is probably not correct (using [0]) trace_report.update(protocol=protocol) trace_report = None diagnostics = 'unknown' if len(trace_candidates) == 0: diagnostics = 'No trace retrieved for given file.' logger.error('No trace retrieved for given file. %s' % fspec.lfn) elif len(trace_candidates) != 1: diagnostics = 'Too many traces for given file.' logger.error('Rucio returned too many traces for given file. %s' % fspec.lfn) else: trace_report = trace_candidates[0] # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type destination = os.path.join(dst, fspec.lfn) if os.path.exists(destination): state, diagnostics = verify_catalog_checksum(fspec, destination) if diagnostics != "" and not ignore_errors and trace_report: # caution, validation against empty string trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) logger.error(diagnostics) elif trace_report: diagnostics = 'file does not exist: %s (cannot verify catalog checksum)' % destination state = 'STAGEIN_ATTEMPT_FAILED' fspec.status_code = ErrorCodes.STAGEINFAILED trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time()) logger.error(diagnostics) else: fspec.status_code = ErrorCodes.STAGEINFAILED if not fspec.status_code: fspec.status_code = 0 fspec.status = 'transferred' trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) files_done.append(fspec) # updating the trace and sending it if not trace_report: logger.error('An unknown error occurred when handling the traces. %s' % fspec.lfn) logger.warning('No trace sent!!!') trace_report.update(guid=fspec.guid.replace('-', '')) trace_report.send() if len(files_done) != len(files): raise PilotException('Not all files downloaded.', code=ErrorCodes.STAGEINFAILED, state='STAGEIN_ATTEMPT_FAILED') return files_done
[docs] def _get_trace(fspec, traces): """ Traces returned by Rucio are not orderred the same as input files from pilot. This method finds the proper trace. :param: fspec: the file that is seeked :param: traces: all traces that are received by Rucio :return: trace_candiates that correspond to the given file """ try: try: trace_candidates = list(filter(lambda t: t['filename'] == fspec.lfn and t['scope'] == fspec.scope, traces)) # Python 2 except Exception: trace_candidates = list([t for t in traces if t['filename'] == fspec.lfn and t['scope'] == fspec.scope]) # Python 3 if trace_candidates: return trace_candidates else: logger.warning('File does not match to any trace received from Rucio: %s %s' % (fspec.lfn, fspec.scope)) except Exception as error: logger.warning('Traces from pilot and rucio could not be merged: %s' % str(error)) return []
#@timeout(seconds=10800)
[docs] def copy_out(files, **kwargs): # noqa: C901 """ Upload given files using rucio copytool. :param files: list of `FileSpec` objects :param ignore_errors: boolean, if specified then transfer failures will be ignored :raise: PilotException in case of controlled error """ # don't spoil the output, we depend on stderr parsing os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]' summary = kwargs.pop('summary', True) ignore_errors = kwargs.pop('ignore_errors', False) trace_report = kwargs.get('trace_report') localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', None) for fspec in files: logger.info('rucio copytool, uploading file with scope: %s and lfn: %s' % (str(fspec.scope), str(fspec.lfn))) localsite = localsite if localsite else fspec.ddmendpoint trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint) trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize) trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', '')) fspec.status_code = 0 summary_file_path = None cwd = fspec.workdir or kwargs.get('workdir') or '.' if summary: summary_file_path = os.path.join(cwd, 'rucio_upload.json') logger.info('the file will be uploaded to %s' % str(fspec.ddmendpoint)) trace_report_out = [] transfer_timeout = get_timeout(fspec.filesize) ctimeout = transfer_timeout + 10 # give the API a chance to do the time-out first logger.info('overall transfer timeout=%s' % ctimeout) error_msg = "" ec = 0 try: ec, trace_report_out = timeout(ctimeout, TimedThread)(_stage_out_api)(fspec, summary_file_path, trace_report, trace_report_out, transfer_timeout) #_stage_out_api(fspec, summary_file_path, trace_report, trace_report_out) except PilotException as error: error_msg = str(error) error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=False) protocol = get_protocol(trace_report_out) trace_report.update(protocol=protocol) if not ignore_errors: trace_report.send() msg = ' %s:%s to %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error')) raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) except Exception as error: error_msg = str(error) error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=False) protocol = get_protocol(trace_report_out) trace_report.update(protocol=protocol) if not ignore_errors: trace_report.send() msg = ' %s:%s to %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error')) raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) else: protocol = get_protocol(trace_report_out) trace_report.update(protocol=protocol) # make sure there was no missed failure (only way to deal with this until rucio API has been fixed) # (using the timeout decorator prevents the trace_report_out from being updated - rucio API should return # the proper error immediately instead of encoding it into a dictionary) state_reason = None if not trace_report_out else trace_report_out[0].get('stateReason') if ec and state_reason and not error_msg: error_details = handle_rucio_error(state_reason, trace_report, trace_report_out, fspec, stagein=False) if not ignore_errors: trace_report.send() msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error')) raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state')) if summary: # resolve final pfn (turl) from the summary JSON if not os.path.exists(summary_file_path): logger.error('Failed to resolve Rucio summary JSON, wrong path? file=%s' % summary_file_path) else: with open(summary_file_path, 'rb') as f: summary_json = json.load(f) dat = summary_json.get("%s:%s" % (fspec.scope, fspec.lfn)) or {} fspec.turl = dat.get('pfn') logger.debug('set turl=%s' % fspec.turl) # quick transfer verification: # the logic should be unified and moved to base layer shared for all the movers adler32 = dat.get('adler32') local_checksum = fspec.checksum.get('adler32') if local_checksum and adler32 and local_checksum != adler32: msg = 'checksum verification failed: local %s != remote %s' % \ (local_checksum, adler32) logger.warning(msg) fspec.status = 'failed' fspec.status_code = ErrorCodes.PUTADMISMATCH trace_report.update(clientState='AD_MISMATCH', stateReason=msg, timeEnd=time()) trace_report.send() if not ignore_errors: raise PilotException("Failed to stageout: CRC mismatched", code=ErrorCodes.PUTADMISMATCH, state='AD_MISMATCH') else: if local_checksum and adler32 and local_checksum == adler32: logger.info('local checksum (%s) = remote checksum (%s)' % (local_checksum, adler32)) else: logger.warning('checksum could not be verified: local checksum (%s), remote checksum (%s)' % (str(local_checksum), str(adler32))) if not fspec.status_code: fspec.status_code = 0 fspec.status = 'transferred' trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) trace_report.send() return files
# stageIn using rucio api.
[docs] def _stage_in_api(dst, fspec, trace_report, trace_report_out, transfer_timeout, use_pcache): ec = 0 # init. download client from rucio.client.downloadclient import DownloadClient download_client = DownloadClient(logger=logger) if use_pcache: download_client.check_pcache = True # traces are switched off if hasattr(download_client, 'tracing'): download_client.tracing = tracing_rucio # file specifications before the actual download f = {} f['did_scope'] = fspec.scope f['did_name'] = fspec.lfn f['did'] = '%s:%s' % (fspec.scope, fspec.lfn) f['rse'] = fspec.ddmendpoint f['base_dir'] = dst f['no_subdir'] = True if fspec.turl: f['pfn'] = fspec.turl if transfer_timeout: f['transfer_timeout'] = transfer_timeout f['connection_timeout'] = 60 * 60 # proceed with the download logger.info('rucio API stage-in dictionary: %s' % f) trace_pattern = {} if trace_report: trace_pattern = trace_report # download client raises an exception if any file failed try: logger.info('*** rucio API downloading file (taking over logging) ***') if fspec.turl: result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) else: result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) except Exception as error: logger.warning('*** rucio API download client failed ***') logger.warning('caught exception: %s', error) logger.debug('trace_report_out=%s', trace_report_out) # only raise an exception if the error info cannot be extracted if not trace_report_out: raise error if not trace_report_out[0].get('stateReason'): raise error ec = -1 else: logger.info('*** rucio API download client finished ***') logger.debug('client returned %s', result) logger.debug('trace_report_out=%s', trace_report_out) return ec, trace_report_out
[docs] def _stage_in_bulk(dst, files, trace_report_out=None, trace_common_fields=None): """ Stage-in files in bulk using the Rucio API. :param dst: destination (string). :param files: list of fspec objects. :param trace_report: :param trace_report_out: :return: """ # init. download client from rucio.client.downloadclient import DownloadClient download_client = DownloadClient(logger=logger) # traces are switched off if hasattr(download_client, 'tracing'): download_client.tracing = tracing_rucio # build the list of file dictionaries before calling the download function file_list = [] for fspec in files: fspec.status_code = 0 # file specifications before the actual download f = {} f['did_scope'] = fspec.scope f['did_name'] = fspec.lfn f['did'] = '%s:%s' % (fspec.scope, fspec.lfn) f['rse'] = fspec.ddmendpoint f['base_dir'] = fspec.workdir or dst f['no_subdir'] = True if fspec.turl: f['pfn'] = fspec.turl else: logger.warning('cannot perform bulk download since fspec.turl is not set (required by download_pfns()') # fail somehow if fspec.filesize: f['transfer_timeout'] = get_timeout(fspec.filesize) f['connection_timeout'] = 60 * 60 file_list.append(f) # proceed with the download trace_pattern = trace_common_fields if trace_common_fields else {} # download client raises an exception if any file failed num_threads = len(file_list) logger.info('*** rucio API downloading files (taking over logging) ***') try: result = download_client.download_pfns(file_list, num_threads, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out) except Exception as error: logger.warning('*** rucio API download client failed ***') logger.warning('caught exception: %s', error) logger.debug('trace_report_out=%s', trace_report_out) # only raise an exception if the error info cannot be extracted if not trace_report_out: raise error if not trace_report_out[0].get('stateReason'): raise error else: logger.info('*** rucio API download client finished ***') logger.debug('client returned %s', result)
[docs] def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out, transfer_timeout): ec = 0 # init. download client from rucio.client.uploadclient import UploadClient upload_client = UploadClient(logger=logger) # traces are turned off if hasattr(upload_client, 'tracing'): upload_client.tracing = tracing_rucio if tracing_rucio: upload_client.trace = trace_report # file specifications before the upload f = {} f['path'] = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(fspec.workdir, fspec.lfn) f['rse'] = fspec.ddmendpoint f['did_scope'] = fspec.scope f['no_register'] = True if transfer_timeout: f['transfer_timeout'] = transfer_timeout f['connection_timeout'] = 60 * 60 # if fspec.storageId and int(fspec.storageId) > 0: # if fspec.turl and fspec.is_nondeterministic: # f['pfn'] = fspec.turl # elif fspec.lfn and '.root' in fspec.lfn: # f['guid'] = fspec.guid if fspec.lfn and '.root' in fspec.lfn: f['guid'] = fspec.guid logger.info('rucio API stage-out dictionary: %s' % f) # upload client raises an exception if any file failed try: logger.info('*** rucio API uploading file (taking over logging) ***') logger.debug('summary_file_path=%s' % summary_file_path) logger.debug('trace_report_out=%s' % trace_report_out) result = upload_client.upload([f], summary_file_path=summary_file_path, traces_copy_out=trace_report_out) except Exception as error: logger.warning('*** rucio API upload client failed ***') logger.warning('caught exception: %s', error) import traceback logger.error(traceback.format_exc()) logger.debug('trace_report_out=%s', trace_report_out) if not trace_report_out: raise error if not trace_report_out[0].get('stateReason'): raise error ec = -1 except UnboundLocalError: logger.warning('*** rucio API upload client failed ***') logger.warning('rucio still needs a bug fix of the summary in the uploadclient') else: logger.warning('*** rucio API upload client finished ***') logger.debug('client returned %s', result) try: file_exists = verify_stage_out(fspec) logger.info('file exists at the storage: %s' % str(file_exists)) if not file_exists: raise StageOutFailure('physical check after upload failed') except Exception as error: msg = 'file existence verification failed with: %s' % error logger.info(msg) raise StageOutFailure(msg) return ec, trace_report_out