#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Wen Guan, wen.guan@cern.ch, 2017-2018
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2019
"""
Exceptions in pilot
"""
import time
import threading
import traceback
from sys import exc_info, version_info
from .errorcodes import ErrorCodes
errors = ErrorCodes()
[docs]
def is_python3():
"""
Check if we are running on Python 3.
:return: boolean.
"""
return version_info >= (3, 0)
[docs]
class PilotException(Exception):
"""
The basic exception class.
The pilot error code can be defined here, where the pilot error code will
be propageted to job server.
"""
[docs]
def __init__(self, *args, **kwargs):
super(PilotException, self).__init__(args, kwargs)
self.args = args
self.kwargs = kwargs
code = self.kwargs.get('code', None)
if code:
self._errorCode = code
else:
self._errorCode = errors.UNKNOWNEXCEPTION
self._message = errors.get_error_message(self._errorCode)
self._error_string = None
self._stack_trace = "%s" % traceback.format_exc()
[docs]
def __str__(self):
try:
self._error_string = "error code: %s, message: %s" % (self._errorCode, self._message % self.kwargs)
except Exception:
# at least get the core message out if something happened
self._error_string = "error code: %s, message: %s" % (self._errorCode, self._message)
if len(self.args) > 0:
# If there is a non-kwarg parameter, assume it's the error
# message or reason description and tack it on to the end
# of the exception message
# Convert all arguments into their string representations...
try:
args = ["%s" % arg for arg in self.args if arg]
except Exception:
args = ["{}".format(self.args)]
self._error_string = (self._error_string + "\ndetails: %s" % '\n'.join(args))
return self._error_string.strip()
[docs]
def get_detail(self):
try:
self._error_string = "error code: %s, message: %s" % (self._errorCode, self._message % self.kwargs)
except Exception:
# at least get the core message out if something happened
self._error_string = "error code: %s, message: %s" % (self._errorCode, self._message)
return self._error_string + "\nstacktrace: %s" % self._stack_trace
[docs]
def get_error_code(self):
return self._errorCode
[docs]
def get_last_error(self):
if self.args:
return self.args[-1]
return self._message
#class NotImplementedError(PilotException):
# """
# Not implemented exception.
# """
# def __init__(self, *args, **kwargs):
# super(NotImplementedError, self).__init__(args, kwargs)
# self._errorCode = errors.NOTIMPLEMENTED
# self._message = errors.get_error_message(self._errorCode)
[docs]
class UnknownException(PilotException):
"""
Unknown exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(UnknownException, self).__init__(args, kwargs)
self._errorCode = errors.UNKNOWNEXCEPTION
self._message = errors.get_error_message(self._errorCode)
[docs]
class NoLocalSpace(PilotException):
"""
Not enough local space.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NoLocalSpace, self).__init__(args, kwargs)
self._errorCode = errors.NOLOCALSPACE
self._message = errors.get_error_message(self._errorCode)
[docs]
class StageInFailure(PilotException):
"""
Failed to stage-in file.
"""
[docs]
def __init__(self, *args, **kwargs):
super(StageInFailure, self).__init__(args, kwargs)
self._errorCode = errors.STAGEINFAILED
self._message = errors.get_error_message(self._errorCode)
[docs]
class StageOutFailure(PilotException):
"""
Failed to stage-out file.
"""
[docs]
def __init__(self, *args, **kwargs):
super(StageOutFailure, self).__init__(args, kwargs)
self._errorCode = errors.STAGEOUTFAILED
self._message = errors.get_error_message(self._errorCode)
[docs]
class SetupFailure(PilotException):
"""
Failed to setup environment.
"""
[docs]
def __init__(self, *args, **kwargs):
super(SetupFailure, self).__init__(args, kwargs)
self._errorCode = errors.SETUPFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class RunPayloadFailure(PilotException):
"""
Failed to execute payload.
"""
[docs]
def __init__(self, *args, **kwargs):
super(RunPayloadFailure, self).__init__(args, kwargs)
self._errorCode = errors.PAYLOADEXECUTIONFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class MessageFailure(PilotException):
"""
Failed to handle messages.
"""
[docs]
def __init__(self, *args, **kwargs):
super(MessageFailure, self).__init__(args, kwargs)
self._errorCode = errors.MESSAGEHANDLINGFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class CommunicationFailure(PilotException):
"""
Failed to communicate with servers such as Panda, Harvester, ACT and so on.
"""
[docs]
def __init__(self, *args, **kwargs):
super(CommunicationFailure, self).__init__(args, kwargs)
self._errorCode = errors.COMMUNICATIONFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class FileHandlingFailure(PilotException):
"""
Failed during file handling.
"""
[docs]
def __init__(self, *args, **kwargs):
super(FileHandlingFailure, self).__init__(args, kwargs)
self._errorCode = errors.FILEHANDLINGFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class NoSuchFile(PilotException):
"""
No such file or directory.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NoSuchFile, self).__init__(args, kwargs)
self._errorCode = errors.NOSUCHFILE
self._message = errors.get_error_message(self._errorCode)
[docs]
class ConversionFailure(PilotException):
"""
Failed to convert object data.
"""
[docs]
def __init__(self, *args, **kwargs):
super(ConversionFailure, self).__init__(args, kwargs)
self._errorCode = errors.CONVERSIONFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class MKDirFailure(PilotException):
"""
Failed to create local directory.
"""
[docs]
def __init__(self, *args, **kwargs):
super(MKDirFailure, self).__init__(args, kwargs)
self._errorCode = errors.MKDIR
self._message = errors.get_error_message(self._errorCode)
[docs]
class NoGridProxy(PilotException):
"""
Grid proxy not valid.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NoGridProxy, self).__init__(args, kwargs)
self._errorCode = errors.NOPROXY
self._message = errors.get_error_message(self._errorCode)
[docs]
class NoVomsProxy(PilotException):
"""
Voms proxy not valid.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NoVomsProxy, self).__init__(args, kwargs)
self._errorCode = errors.NOVOMSPROXY
self._message = errors.get_error_message(self._errorCode)
[docs]
class TrfDownloadFailure(PilotException):
"""
Transform could not be downloaded.
"""
[docs]
def __init__(self, *args, **kwargs):
super(TrfDownloadFailure, self).__init__(args, kwargs)
self._errorCode = errors.TRFDOWNLOADFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class NotDefined(PilotException):
"""
Not defined exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NotDefined, self).__init__(args, kwargs)
self._errorCode = errors.NOTDEFINED
self._message = errors.get_error_message(self._errorCode)
[docs]
class NotSameLength(PilotException):
"""
Not same length exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NotSameLength, self).__init__(args, kwargs)
self._errorCode = errors.NOTSAMELENGTH
self._message = errors.get_error_message(self._errorCode)
[docs]
class ESRecoverable(PilotException):
"""
Eventservice recoverable exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(ESRecoverable, self).__init__(args, kwargs)
self._errorCode = errors.ESRECOVERABLE
self._message = errors.get_error_message(self._errorCode)
[docs]
class ESFatal(PilotException):
"""
Eventservice fatal exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(ESFatal, self).__init__(args, kwargs)
self._errorCode = errors.ESFATAL
self._message = errors.get_error_message(self._errorCode)
[docs]
class ExecutedCloneJob(PilotException):
"""
Clone job executed exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(ExecutedCloneJob, self).__init__(args, kwargs)
self._errorCode = errors.EXECUTEDCLONEJOB
self._message = errors.get_error_message(self._errorCode)
[docs]
class ESNoEvents(PilotException):
"""
Eventservice no events exception.
"""
[docs]
def __init__(self, *args, **kwargs):
super(ESNoEvents, self).__init__(args, kwargs)
self._errorCode = errors.ESNOEVENTS
self._message = errors.get_error_message(self._errorCode)
[docs]
class ExceededMaxWaitTime(PilotException):
"""
Exceeded maximum waiting time (after abort_job has been set).
"""
[docs]
def __init__(self, *args, **kwargs):
super(ExceededMaxWaitTime, self).__init__(args, kwargs)
self._errorCode = errors.EXCEEDEDMAXWAITTIME
self._message = errors.get_error_message(self._errorCode)
[docs]
class BadXML(PilotException):
"""
Badly formed XML.
"""
[docs]
def __init__(self, *args, **kwargs):
super(BadXML, self).__init__(args, kwargs)
self._errorCode = errors.BADXML
self._message = errors.get_error_message(self._errorCode)
[docs]
class NoSoftwareDir(PilotException):
"""
Software applications directory does not exist.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NoSoftwareDir, self).__init__(args, kwargs)
self._errorCode = errors.NOSOFTWAREDIR
self._message = errors.get_error_message(self._errorCode)
[docs]
class LogFileCreationFailure(PilotException):
"""
Log file could not be created.
"""
[docs]
def __init__(self, *args, **kwargs):
super(LogFileCreationFailure, self).__init__(args, kwargs)
self._errorCode = errors.LOGFILECREATIONFAILURE
self._message = errors.get_error_message(self._errorCode)
[docs]
class QueuedataFailure(PilotException):
"""
Failed to download queuedata.
"""
[docs]
def __init__(self, *args, **kwargs):
super(QueuedataFailure, self).__init__(args, kwargs)
self._errorCode = errors.QUEUEDATA
self._message = errors.get_error_message(self._errorCode)
[docs]
class QueuedataNotOK(PilotException):
"""
Corrupt queuedata.
"""
[docs]
def __init__(self, *args, **kwargs):
super(QueuedataNotOK, self).__init__(args, kwargs)
self._errorCode = errors.QUEUEDATANOTOK
self._message = errors.get_error_message(self._errorCode)
[docs]
class ReplicasNotFound(PilotException):
"""
No matching replicas were found in list_replicas() output.
"""
[docs]
def __init__(self, *args, **kwargs):
super(ReplicasNotFound, self).__init__(args, kwargs)
self._errorCode = errors.NOREPLICAS
self._message = errors.get_error_message(self._errorCode)
[docs]
class JobAlreadyRunning(PilotException):
"""
Job is already running elsewhere.
"""
[docs]
def __init__(self, *args, **kwargs):
super(JobAlreadyRunning, self).__init__(args, kwargs)
self._errorCode = errors.JOBALREADYRUNNING
self._message = errors.get_error_message(self._errorCode)
[docs]
def __str__(self):
return "%s: %s, timeout=%s seconds%s" % (self.__class__.__name__, self._message, self._timeout, ' : %s' % repr(self.args) if self.args else '')
[docs]
class ExcThread(threading.Thread):
"""
Support class that allows for catching exceptions in threads.
"""
[docs]
def __init__(self, bucket, target, kwargs, name):
"""
Init function with a bucket that can be used to communicate exceptions to the caller.
The bucket is a Queue.queue() or queue.Queue() object that can hold an exception thrown by a thread.
:param bucket: queue based bucket.
:param target: target function to execute.
:param kwargs: target function options.
"""
threading.Thread.__init__(self, target=target, kwargs=kwargs, name=name)
self.name = name
self.bucket = bucket
[docs]
def run(self):
"""
Thread run function.
Any exceptions in the threads are caught in this function and placed in the bucket of the current thread.
The bucket will be emptied by the control module that launched the thread. E.g. an exception is thrown in
the retrieve thread (in function retrieve()) that is created by the job.control thread. The exception is caught
by the run() function and placed in the bucket belonging to the retrieve thread. The bucket is emptied in
job.control().
:return:
"""
try:
if is_python3():
self._target(**self._kwargs)
else:
self._Thread__target(**self._Thread__kwargs)
except Exception:
# logger object can't be used here for some reason:
# IOError: [Errno 2] No such file or directory: '/state/partition1/scratch/PanDA_Pilot2_*/pilotlog.txt'
print('exception caught by thread run() function: %s' % str(exc_info()))
print(traceback.format_exc())
print(traceback.print_tb(exc_info()[2]))
self.bucket.put(exc_info())
print("exception has been put in bucket queue belonging to thread \'%s\'" % self.name)
if is_python3():
args = self._kwargs.get('args', None)
else:
args = self._Thread__kwargs.get('args', None)
if args:
# the sleep is needed to allow the threads to catch up
print('setting graceful stop in 10 s since there is no point in continuing')
time.sleep(10)
args.graceful_stop.set()
[docs]
def get_bucket(self):
"""
Return the bucket object that holds any information about thrown exceptions.
:return: bucket (Queue object)
"""
return self.bucket