Source code for pilot.copytool.lsm

#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Pavlo Svirin, pavlo.svirin@cern.ch, 2017
# - Tobias Wegner, tobias.wegner@cern.ch, 2018
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021

import os
import logging
import errno
from time import time

from .common import get_copysetup, verify_catalog_checksum, resolve_common_transfer_errors  #, get_timeout
from pilot.common.exception import StageInFailure, StageOutFailure, PilotException, ErrorCodes
from pilot.util.container import execute
#from pilot.util.timer import timeout


logger = logging.getLogger(__name__)

require_replicas = True  ## indicate if given copytool requires input replicas to be resolved

allowed_schemas = ['srm', 'gsiftp', 'root']  # prioritized list of supported schemas for transfers by given copytool


[docs] def is_valid_for_copy_in(files): return True ## FIX ME LATER
#for f in files: # if not all(key in f for key in ('name', 'source', 'destination')): # return False #return True
[docs] def is_valid_for_copy_out(files): #for f in files: # if not all(key in f for key in ('name', 'source', 'destination')): # return False return True
[docs] def copy_in_old(files): """ Tries to download the given files using lsm-get directly. :param files: Files to download :raises PilotException: StageInFailure """ if not check_for_lsm(dst_in=True): raise StageInFailure("No LSM tools found") exit_code, stdout, stderr = move_all_files_in(files) if exit_code != 0: # raise failure raise StageInFailure(stdout)
[docs] def copy_in(files, **kwargs): """ Download given files using the lsm-get command. :param files: list of `FileSpec` objects. :raise: PilotException in case of controlled error. :return: files `FileSpec` object. """ exit_code = 0 stdout = "" stderr = "" copytools = kwargs.get('copytools') or [] copysetup = get_copysetup(copytools, 'lsm') trace_report = kwargs.get('trace_report') #allow_direct_access = kwargs.get('allow_direct_access') # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite')) for fspec in files: # update the trace report localsite = localsite if localsite else fspec.ddmendpoint trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize) trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', '')) trace_report.update(scope=fspec.scope, dataset=fspec.dataset) # continue loop for files that are to be accessed directly ## TO BE DEPRECATED (anisyonk) #if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct': # fspec.status_code = 0 # fspec.status = 'remote_io' # trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access') # trace_report.send() # continue trace_report.update(catStart=time()) dst = fspec.workdir or kwargs.get('workdir') or '.' #timeout = get_timeout(fspec.filesize) source = fspec.turl destination = os.path.join(dst, fspec.lfn) logger.info("transferring file %s from %s to %s", fspec.lfn, source, destination) exit_code, stdout, stderr = move(source, destination, dst_in=True, copysetup=copysetup) if exit_code != 0: logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr) error = resolve_common_transfer_errors(stderr, is_stagein=True) fspec.status = 'failed' fspec.status_code = error.get('rcode') trace_report.update(clientState=error.get('state') or 'STAGEIN_ATTEMPT_FAILED', stateReason=error.get('error'), timeEnd=time()) trace_report.send() raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state')) # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type state, diagnostics = verify_catalog_checksum(fspec, destination) if diagnostics != "": trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=fspec.status_code, state=state) fspec.status_code = 0 fspec.status = 'transferred' trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) trace_report.send() # for testing kill signals #import signal #os.kill(os.getpid(), signal.SIGSEGV) return files
[docs] def copy_out(files, **kwargs): """ Upload given files using lsm copytool. :param files: list of `FileSpec` objects. :raise: PilotException in case of controlled error. """ copytools = kwargs.get('copytools') or [] copysetup = get_copysetup(copytools, 'lsm') trace_report = kwargs.get('trace_report') ddmconf = kwargs.get('ddmconf', None) if not ddmconf: raise PilotException("copy_out() failed to resolve ddmconf from function arguments", code=ErrorCodes.STAGEOUTFAILED, state='COPY_ERROR') for fspec in files: trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize) trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', '')) # resolve token value from fspec.ddmendpoint ddm = ddmconf.get(fspec.ddmendpoint) token = ddm.token if not token: diagnostics = "copy_out() failed to resolve token value for ddmendpoint=%s" % (fspec.ddmendpoint) trace_report.update(clientState='STAGEOUT_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time()) trace_report.send() raise PilotException(diagnostics, code=ErrorCodes.STAGEOUTFAILED, state='COPY_ERROR') src = fspec.workdir or kwargs.get('workdir') or '.' #timeout = get_timeout(fspec.filesize) source = os.path.join(src, fspec.lfn) destination = fspec.turl # checksum has been calculated in the previous step - transfer_files() in api/data # note: pilot is handing over checksum to the command - which will/should verify it after the transfer checksum = "adler32:%s" % fspec.checksum.get('adler32') # define the command options opts = {'--size': fspec.filesize, '-t': token, '--checksum': checksum, '--guid': fspec.guid} try: opts = " ".join(["%s %s" % (k, v) for (k, v) in opts.iteritems()]) # Python 2 except Exception: opts = " ".join(["%s %s" % (k, v) for (k, v) in list(opts.items())]) # Python 3 logger.info("transferring file %s from %s to %s", fspec.lfn, source, destination) nretries = 1 # input parameter to function? for retry in range(nretries): exit_code, stdout, stderr = move(source, destination, dst_in=False, copysetup=copysetup, options=opts) if exit_code != 0: if stderr == "": stderr = stdout error = resolve_common_transfer_errors(stderr, is_stagein=False) fspec.status = 'failed' fspec.status_code = error.get('exit_code') trace_report.update(clientState=error.get('state', None) or 'STAGEOUT_ATTEMPT_FAILED', stateReason=error.get('error', 'unknown error'), timeEnd=time()) trace_report.send() raise PilotException(error.get('error'), code=error.get('exit_code'), state=error.get('state')) else: # all successful logger.info('all successful') break fspec.status_code = 0 fspec.status = 'transferred' trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time()) trace_report.send() return files
[docs] def copy_out_old(files): """ Tries to upload the given files using lsm-put directly. :param files: Files to upload :raises PilotException: StageOutFailure """ if not check_for_lsm(dst_in=False): raise StageOutFailure("No LSM tools found") exit_code, stdout, stderr = move_all_files_out(files) if exit_code != 0: # raise failure raise StageOutFailure(stdout)
[docs] def move_all_files_in(files, nretries=1): """ Move all files. :param files: :param nretries: number of retries; sometimes there can be a timeout copying, but the next attempt may succeed :return: exit_code, stdout, stderr """ exit_code = 0 stdout = "" stderr = "" for entry in files: # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>} logger.info("transferring file %s from %s to %s", entry['name'], entry['source'], entry['destination']) source = entry['source'] + '/' + entry['name'] destination = os.path.join(entry['destination'], entry['name']) for retry in range(nretries): exit_code, stdout, stderr = move(source, destination, dst_in=True) if exit_code != 0: if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries: logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr) return exit_code, stdout, stderr else: # all successful break return exit_code, stdout, stderr
[docs] def move_all_files_out(files, nretries=1): """ Move all files. :param files: :return: exit_code, stdout, stderr """ exit_code = 0 stdout = "" stderr = "" for entry in files: # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>} logger.info("transferring file %s from %s to %s", entry['name'], entry['source'], entry['destination']) destination = entry['destination'] + '/' + entry['name'] source = os.path.join(entry['source'], entry['name']) for retry in range(nretries): exit_code, stdout, stderr = move(source, destination, dst_in=False) if exit_code != 0: if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries: logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr) return exit_code, stdout, stderr else: # all successful break return exit_code, stdout, stderr
#@timeout(seconds=10800)
[docs] def move(source, destination, dst_in=True, copysetup="", options=None): """ Use lsm-get or lsm-put to transfer the file. :param source: path to source (string). :param destination: path to destination (string). :param dst_in: True for stage-in, False for stage-out (boolean). :return: exit code, stdout, stderr """ # copysetup = '/osg/mwt2/app/atlas_app/atlaswn/setup.sh' if copysetup != "": cmd = 'source %s;' % copysetup else: cmd = '' args = "%s %s" % (source, destination) if options: args = "%s %s" % (options, args) if dst_in: cmd += "lsm-get %s" % args else: cmd += "lsm-put %s" % args try: exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True) #, timeout=get_timeout(fspec.filesize)) except Exception as error: if dst_in: exit_code = ErrorCodes.STAGEINFAILED else: exit_code = ErrorCodes.STAGEOUTFAILED stdout = 'exception caught: e' % error stderr = '' logger.warning(stdout) logger.info('exit_code=%d, stdout=%s, stderr=%s', exit_code, stdout, stderr) return exit_code, stdout, stderr
[docs] def check_for_lsm(dst_in=True): cmd = None if dst_in: cmd = 'which lsm-get' else: cmd = 'which lsm-put' exit_code, gfal_path, _ = execute(cmd) return exit_code == 0