Brokerage
The brokerage is one of the most crucial functions in the system to distribute workload among computing resources. It has the following goals:
To assign enough jobs to computing resources to utilize all available CPUs continuously.
To minimize the waiting time for each job to produce output data.
To execute jobs in such a way that the jobs respect their priorities and resource allocations.
To choose computing resources for each job based on characteristics of the job and constraints of the computing resources.
It is not straightforward to satisfy those goals for all jobs since some of them are logically contradictory. The brokerage has a plugin structure so that organizations can provide their algorithms according to their needs and use-cases.
This page explains the algorithms of some advanced plugins.
ATLAS Production Task Brokerage
The ATLAS production task brokerage assigns each task to a nucleus as follows:
Generate the primary list of nuclei in the ACTIVE status.
Filter out the list with the following checks:
Nuclei are skipped if there are long transfer backlogs unless
t1Weight
of the task is negative.Nuclei must have associated storages.
The storages associated to nuclei must have enough space
\[spaceFree + spaceExpired - normalizedExpOutSize \times RW > diskThreshold\]where spaceFree is the free space size in the associated storage, spaceExpired is the size of the space that expired secondary data occupies, normalizedExpOutSize is the expected size of the output file normalized by cpuTime × corePower × day (0.25), RW is the total amount of exiting workload assigned to the nucleus, and diskThreshold is the threshold defined per gshare as
DISK_THRESHOLD_<gshare>
in gdpconfig.Nuclei must pass the data locality check with the following rules:
All input datasets are considered if the
taskBrokerOnMaster
task parameter is set to False or is unspecified. Otherwise, only the primary input dataset is considered, i.e., secondary datasets are ignored.Nuclei with incomplete local data can be used if the
inputPreStaging
task parameter is set to True since the parameter enables the data carousel mode and guarantees the input data completeness. Those nuclei ignore the next two rules with the input data size and the number of input files.The fraction of the data size locally available divide by the total input size must be larger than
INPUT_SIZE_FRACTION
(defined in gdpconfig) if the total input size is larger thanINPUT_SIZE_THRESHOLD
(defined in gdpconfig).The fraction of the number of files locally available divide by the total number of input files must be larger than
INPUT_NUM_FRACTION
(defined in gdpconfig) if the total input size is larger thanINPUT_NUM_THRESHOLD
(defined in gdpconfig).The entire data locality check is disabled if no nuclei pass and
ioIntencity
of the task is less than or equal toMIN_IO_INTENSITY_WITH_LOCAL_DATA
and the total input size is less than or equal toMIN_INPUT_SIZE_WITH_LOCAL_DATA
, ortaskPriority
of the task is larger than or equal toMAX_TASK_PRIO_WITH_LOCAL_DATA
.
MIN_blah
andMAX_blah
are defined in gdpconfig.
Calculate brokerage weight for remaining nuclei using the following formula to choose a nuclei based on that:
When
ioIntencity
of the task is greater thanMIN_IO_INTENSITY_WITH_LOCAL_DATA
\[weight =\frac {localInputSize \times tapeWeight \times (spaceFree + spaceExpired)} {max(rwOffset, RW) \times totalInputSize \times spaceTotal}\]Otherwise,
\[weight =\frac {tapeWeight \times (spaceFree + spaceExpired)} {max(rwOffset, RW) \times spaceTotal}\]where localInputSize is the size of input data locally available, totalInputSize is the total size of input data, tapeWeight is 0.001 if input data is on the tape storage, or 1 otherwise, rwOffset is 50 to have the minimum offset for RW, and spaceTotal is the total size of the storage.
If all nuclei are skipped, the task is pending for 30 min and then gets retried.
ATLAS Production Job Brokerage
Here is the ATLAS production job brokerage flow:
Generate the list of preliminary candidates from one of the following:
All queues while excluding any queue with case-insensitive ‘test’ in the name.
A list of pre-assigned queues. Unified queues are resolved to pseudo-queues. Although merge jobs are pre-assigned to avoid transferring small pre-merged files, the pre-assignment is ignored if the relevant queues have been skipped for 24 hours.
Filter out preliminary candidates that don’t pass any of the following checks:
The queue status must be online unless the queues are pre-assigned.
Skip queues if their links to the nucleus are blocked.
Skip queues if over the
NQUEUED_SAT_CAP
(defined in gdpconfig) files queued on their links to the nucleus.Skip all queues if the number of files to be aggregated to the nucleus is larger than
NQUEUED_NUC_CAP_FOR_JOBS
(defined in gdpconfig).If priority ≥ 800 or scout jobs, skip queues unless associated with the nucleus.
If priority ≥ 800 or scout jobs or merging jobs or pre-merged jobs, skip inactive queues (where no jobs got started in the last 2 hours although activated jobs had been there).
Zero Share, which is defined in the
fairsharepolicy
field in CRIC. For example type=evgensimul:100%, in this case, only evgen or simul jobs can be assigned as others have zero shares.If the task
ioIntensity
is larger thanIO_INTENSITY_CUTOFF
(defined in gdpconfig), the total size of missing files must be less thanSIZE_CUTOFF_TO_MOVE_INPUT
(defined in gdpconfig) and the number of missing files must be less thanNUM_CUTOFF_TO_MOVE_INPUT
(defined in gdpconfig). I.e., if a queue needs to transfer more input files, the queue is skipped.There is a general
MAX_DISKIO_DEFAULT
limit in gdpconfig. It is possible to overwrite the limit for a particular queue through themaxDiskIO
(in kB/sec per core) field in CRIC. The limit is applied in job brokerage: when the average diskIO per core for running jobs in a queue exceeds the limit, the next cycles of job brokerage will exclude tasks withdiskIO
higher than the defined limit to progressively get the diskIO under the threshold.CPU Core count matching amount site.coreCount, task.coreCount, and maxCoreCount of the task if defined.
Availability of ATLAS release/cache. This check is skipped when queues have ANY in the
releases
filed in CRIC. If queues have AUTO in thereleases
filed, the brokerage uses the information published in a json by CRIC as explained at this section.Queues publish maximum (and minimum) memory size per core. The expected memory site of each job is estimated for each queue as
\[(baseRamCount + ramCount \times coreCount) \times compensation\]where compensation is 0.9, avoiding sending jobs to high-memory queues when their expected memory usage is close to the lower limit. Queues are skipped if the estimated memory usage is not included in the acceptable memory ranges.
Skip queues if they don’t support direct access to read input files from the local storage, although the task is configured to use only direct access.
The disk usage for a job is estimated as
\[inputDiskCount + max (0.5 GB, outDiskCount \times nEvents \: or \: outDiskCount \times inputDiskCount) + workDiskCount\]inputDiskCount is the total size of job input files, a discrete function of nEvents. nEvents is the smallest number of events in a single job allowed based on the task requirements and is used to estimate the output size by multiplying outDiskCount when outDiskCountUnit ends with “PerEvents”, otherwise, inputDiskCount is used. inputDiskCount is zero if the queues are configured to read input files directly from the local storage.
maxwdir
is divided by coreCount at each queue and the resultant value must be larger than the expected disk usage.DISK size check, free space in the local storage has to be over 200GB.
Skip blacklisted storage endpoints.
If scout or merge jobs, skip queues if their
maxtime
is less than 24 hours.The estimated walltime for a job is
\[\frac {cpuTime \times nEvents} {C \times P \times cpuEfficiency} + baseTime\]nEvents is the same as the one used to estimate the disk usage. The estimated walltime must be between
mintime
andmaxtime
at the queue.wnconnectivity
of the queue must be consistent if the task specifiesipConnectivity
. The format ofwnconnectivity
andipConnectivity
isnetwork_connectivity#ip_stack
. network_connectivity of the queue isfull: to accept any tasks since outbound network connectivity is fully available,
http: to accept tasks with network_connectivity=http or none since only http access is available, or
none: to accept tasks with network_connectivity=none since no outbound network connectivity is available,
ip_stack of the queue is
IPv4: to accept tasks with ip_stack=IPv4,
IPv6: to accept tasks with ip_stack=IPv6, or
‘’ (unset): to accept tasks without specifying ip_stack.
Settings for event service and the dynamic number of events.
Too many transferring jobs: skip if transferring > max(transferring_limit, 2 x running), where transferring_limit limit is defined by site or 2000 if undefined.
Use only the queues associated with the nucleus if the task sets
t1Weight=-1
and normal jobs are being generated.Skip queues without pilots for the last 3 hours.
If processingType=*urgent* or priority ≥ 1000, the Network weight must be larger than or equal to
NW_THRESHOLD
×NW_WEIGHT_MULTIPLIER
(both defined in gdpconfig).
Calculate brokerage weight for remaining candidates. The initial weight is based on running vs queued jobs. The brokerage uses the largest one as the number of running jobs among the following numbers:
The actual number of running jobs at the queue, Rreal.
min(nBatchJob, 20) if Rreal < 20 and nBatchJob (the number of running+submitted batch workers at PQ) > Rreal. Mainly for bootstrap.
numSlots if it is set to a positive number for the queue to the proactive job assignment.
The number of starting jobs if numSlots is set to zero, which is typically useful for Harvester to fetch jobs when the number of available slots dynamically changes.
The number of assigned jobs is ignored for the weight calculation and the subsequent filtering if the input for the jobs being considered is already available locally. Jobs waiting for data transfer do not block new jobs needing no transfer.
\[manyAssigned = max(1, min(2, \frac {assigned} {activated}))\]\[weight = \frac {running + 1} {(activated + assigned + starting + defined + 10) \times manyAssigned}\]Take data availability into consideration.
\[weight = weight \times \frac {availableSize + totalSize} {totalSize \times (numMissingFiles / 100 + 1)}\]Apply a Network weight based on connectivity between nucleus and satellite, since the output files are aggregated to the nucleus.
\[weight = weight \times networkWeight\]Apply further filters.
Skip queues if activated + starting > 2 × running.
Skip queues if defined+activated+assigned+starting > 2 × running.
If all queues are skipped, the task is pending for 1 hour. Otherwise, the remaining candidates are sorted by weight, and the best 10 candidates are taken.
Release/cache Availability Check for releases=AUTO
Each queue publishes something like
"AGLT2": {
"cmtconfigs": [
"x86_64-centos7-gcc62-opt",
"x86_64-centos7-gcc8-opt",
"x86_64-slc6-gcc49-opt",
"x86_64-slc6-gcc62-opt",
"x86_64-slc6-gcc8-opt"
],
"containers": [
"any",
"/cvmfs"
],
"cvmfs": [
"atlas",
"nightlies"
],
"architectures": [
{
"arch": ["x86_64"],
"instr": ["avx2"],
"type": "cpu",
"vendor": ["intel","excl"]
},
{
"type": "gpu",
"vendor": ["nvidia","excl"],
"model":["kt100"]
}
],
"tags": [
{
"cmtconfig": "x86_64-slc6-gcc62-opt",
"container_name": "",
"project": "AthDerivation",
"release": "21.2.2.0",
"sources": [],
"tag": "VO-atlas-AthDerivation-21.2.2.0-x86_64-slc6-gcc62-opt"
},
{
"cmtconfig": "x86_64-slc6-gcc62-opt",
"container_name": "",
"project": "Athena",
"release": "21.0.38",
"sources": [],
"tag": "VO-atlas-Athena-21.0.38-x86_64-slc6-gcc62-opt"
}
]
}
Checks for CPU and/or GPU Hardware
The format of task architecture
is sw_platform<@base_platform><#host_cpu_spec><&host_gpu_spec>
where
host_cpu_spec
is architecture<-vendor<-instruction_set>>
and
host_gpu_spec
is vendor<-model>
.
If host_cpu_spec
or host_gpu_spec
is specified, the architectures
of the queue is checked.
The architectures
can contain two dictionaries to describe CPU and GPU hardware specifications.
All attributes of the
dictionaries except for the type attribute take lists of strings. If ‘attribute’: [‘blah’], the queue
accepts tasks with attribute=’blah’ or without specifying the attribute. If ‘excl’ is included in the list,
the queue accepts only tasks with attribute=’blah’.
For example, tasks with #x86_64 are accepted by queues with “arch”: [“x86_64”], “arch”: [“”],
or “arch”: [“x86_64”, “excl”], but not by “arch”: [“arm64”].
Checks for Fat Containers
If the task uses a container, i.e., the container_name
attribute is set, the brokerage checks as follows:
If the task uses only tags, i.e., it sets
onlyTagsForFC
, thecontainer_name
must be equal to the container_name of a tag in thetags
list or must be included in thesources
of a tag in thetags
list.If the task doesn’t set
onlyTagsForFC
,‘any’ or ‘/cvmfs’ must be included in the
containers
list, orcontainer_name
must be forward-matched with one of the strings in thecontainers
list, orcontainer_name
is resolved to the source path using the dictionary of the “ALL” queue, and the resolved source path must be forward-matched with one of the strings in thecontainers
list.
Checks for Releases, Caches, or Nightlies
Checks are as follows for releases, caches, and nightlies:
‘any’ or cvmfs_tag must be included in the
cvmfs
list, where cvmfs_tag is atlas for standard releases and caches or nightlies for nightlies. In addition,‘any’ or ‘/cvmfs’ must be included in the
containers
list, orthe task
sw_platform
is extracted from the taskarchitecture
and must be included in thecmtconfigs
list.
If the above is not the case,
‘any’ must be in the
containers
list orbase_platform
in the taskarchitecture
must be empty, andthe task
sw_platform
,sw_project
, andsw_version
must be equal tocmtconfig
,project
, andrelease
of a tag in thetags
list.
Network Weight
The network data sources are
the Network Weather Service as the dynamic source, and
the CRIC closeness as a semi static source.
Given the accuracy of the data and the timelapse from decision to action, the network weight only aims to provide a simple, dynamic classification of links. It is currently calculated as:
where the queued and throughput weight are calculated as in the plot below:

queuedWeight

throughputWeight
It uses the most recent available data, so preferably data of the last 1 hour, in not available of last 1 day, if not available of last 1 week. FTS Mbps are used, which are filled from Chicago elastic search. If there are no available network metrics, the AGIS closeness (0 best to 11 worst) is used in a normalized way
Timeout Rules
1 hour for pending jobs
4 hours for defined jobs
12 hours for assigned jobs
7 days for throttled jobs
2 days for activated or starting jobs
4 hours for activated or starting jobs with job.currentPriority>=800 at the queues where
laststart
in theSiteData
table is older than 2 hours30 min for sent jobs
21 days for running jobs
2 hours for heartbeats from running or starting jobs. Each
workflow
can define own timeout value using HEARTBEAT_TIMEOUT_<workflow> in gdpconfigthe above HEARTBEAT_TIMEOUT_<workflow> for transferring jobs with the
workflow
and own stage-out mechanism that sets not-null job.jobSubStatus3 hours for holding jobs with job.currentPriority>=800, while days for holding jobs with job.currentPriority<800
transtimehi
days for transferring jobs with job.currentPriority>=800, whiletranstimelo
days for transferring jobs with job.currentPriority<800disable all timeout rules when the queue status is paused or the queue has disableReassign in
catchall
fast rebrokerage for defined, assigned, activated, or starting jobs at the queues where
nQueue_queue(gshare)/nRun_queue(gshare) is larger than FAST_REBRO_THRESHOLD_NQNR_RATIO
nQueue_queue(gshare)/nQueue_total(gshare) is larger than FAST_REBRO_THRESHOLD_NQUEUE_FRAC
nQueue_queue(gshare) is larger than FAST_REBRO_THRESHOLD_NQUEUE_<gshare>. Unless the gshare defines the parameter it doesn’t trigger the fast rebrokerage
FAST_REBRO_THRESHOLD_blah is defined in gdpconfig
nSlots is not defined in the
Harvester_Slots
table since it intentionally cause large nQueue when nRun is small
Special Brokerage for ATLAS Full-chain
There is a mechanism in the ATLAS production task and job brokerages to assign an entire workflow (full-chain)
to a specific nucleus.
The main idea is to avoid data transfers between the nucleus and satellites, and burst-process all data
on the nucleus to deriver the final products quickly.
The nuclei are defined as bare nuclei by adding
bareNucleus=only or bareNucleus=allow in the catchall
field in CRIC.
The former accepts only full-chain workflows while the latter accepts normal workflows in addition to full-chain
workflows. Tasks can set the fullChain
parameter to use the mechanism. The value can be
only to be assigned to a nucleus with bareNucleus=only,
require to be assigned to a nucleus with bareNucleus=only or bareNucleus=allow, or
capable.
When capable is set, the task is assigned to the same nucleus as that of the parent task only if the parent task was assigned to a bare nucleus. Otherwise, capable is ignored and the task can go to a normal nucleus.
Once a task with the fullChain
parameter is assigned to a bare nucleus, the job brokerage sends jobs only
to the queues associated to the nucleus. On the other hand. if a normal task is assigned to a bare nucleus
with bareNucleus=allow
or a task with fullChain
= capable is assigned to a normal nucleus, the job brokerage sends jobs to the
queues associated to satellites in addition to the nucleus.
ATLAS Analysis Job Brokerage
This is the ATLAS analysis job brokerage flow:
First, the brokerage counts the number of running/queued jobs/cores for the user (or the working group if the task specifies
workingGroup
), checks the global disk quota, and check the classification of the task. The brokerage throttles the task (not going to submit jobs for the task) if ANY of the following conditions holds:Number of User Jobs Exceeds Caps
the number of running jobs is larger than
CAP_RUNNING_USER_JOBS
the number of queued jobs is larger than 2 ×
CAP_RUNNING_USER_JOBS
the number of running cores is larger than
CAP_RUNNING_USER_CORES
the number of queued cores is larger than 2 ×
CAP_RUNNING_USER_CORES
For the working group it usesCAP_RUNNING_GROUP_JOBS
andCAP_RUNNING_GROUP_CORES
instead.AllCAP_blah
are defined in gdpconfig.User’s Usage Exceeds Quota
the global disk quota exceeds
User Analysis Share Usage Exceeds its Target and the Task is in Lower Class
Currently this part of throttle is only for User Analysis share. Tasks in other shares are immune to it(For task and site classification, see Site & Task Classification)the User Analysis task is in class C , and User Analysis share usage > 90% of its target
the User Analysis task is in class B , and User Analysis share usage > 95% of its target
the User Analysis task is in class A , and User Analysis share usage > max(
USER_USAGE_THRESHOLD_A
/(# of user’s running slots in hi-sites), 1)*100% of its target
Next, the brokerage generates the list of preliminary candidates as follows:
Take all queues with type=’analysis’ or ‘unified’.
Exclude queues if
excludedSite
is specified and the queues are included in the list.Exclude queues if
includedSite
is specified and the queues are not included in the list. Pre-assigned queues are specified inincludedSite
orsite
.
Then, the brokerage filters out preliminary candidates that don’t pass any of the following checks: There are two types of filters, filters for persistent issues and filters for temporary issues.
- Filters for persistent issues
The queue status must be online unless the queues are pre-assigned.
Input data locality check to skip queues if they don’t have input data locally. This check is suppressed if
taskPriority
≥ 2000 orioIntensity
≤IO_INTENSITY_CUTOFF_USER
(defined in gdpconfig).Check with
MAX_DISKIO_DEFAULT
limit defined in gdpconfig. It is possible to overwrite the limit for a particular queue through themaxDiskIO
(in kB/sec per core) field in CRIC. The limit is applied in job brokerage: when the average diskIO per core for running jobs in a queue exceeds the limit, the next cycles of job brokerage will exclude tasks withdiskIO
higher than the defined limit to progressively get the diskIO under the threshold.CPU Core count matching.
Skip VP queues if the task specifies
avoidVP
or those queues are overloaded.Availability of ATLAS release/cache. This check is skipped when queues have ANY in the
releases
filed in CRIC. If queues have AUTO in thereleases
filed, the brokerage uses the information published in a json by CRIC as explained at this section.Queues publish maximum (and minimum) memory size per core. The expected memory site of each job is estimated for each queue as
\[(baseRamCount + ramCount \times coreCount) \times compensation\]if
ramCountUnit
is MBPerCore, or\[(baseRamCount + ramCount) \times compensation\]if
ramCountUnit
is MB, where compensation is 0.9, avoiding sending jobs to high-memory queues when their expected memory usage is close to the lower limit. Queues are skipped if the estimated memory usage is not included in the acceptable memory ranges.The disk usage for a job is estimated as
\[(0 \: or \: inputDiskCount) + outDiskCount \times inputDiskCount + workDiskCount\]inputDiskCount is the total size of job input files. The first term in the above formula is zero if the queues are configured to read input files directly from the local storage.
maxwdir
is divided by coreCount at each queue and the resultant value must be larger than the expected disk usage.DISK size check, free space in the local storage has to be over 200GB.
Skip blacklisted storage endpoints.
Analysis job walltime is estimated using the same formula as that for production jobs. The estimated walltime must be between
mintime
andmaxtime
at the queue.Skip queues without pilots for the last 3 hours.
- Filters for temporary issues
Skip queues if there are many jobs from the task closed or failed for the last
TW_DONE_JOB_STAT
hours. (nFailed + nClosed) must be less than max(2 × nFinished,MIN_BAD_JOBS_TO_SKIP_PQ
).TW_DONE_JOB_STAT
andMIN_BAD_JOBS_TO_SKIP_PQ
are defined in gdpconfig.Skip queues if defined+activated+assigned+starting > 2 × max(20, running).
Skip queues if the user has too many queued jobs there.
- Filters to control user queue length on each PQ per ghsare: Analysis Stabilizer
First, decide whether the PQ can be throttled. The PQ will NOT be throttled if it does not have enough queuing jobs; that is, when any condition of the following is satisfied:
nQ(PQ) <
BASE_QUEUE_LENGTH_PER_PQ
nQ(PQ) <
BASE_EXPECTED_WAIT_HOUR_ON_PQ
* trr(PQ)
where trr stands for to-running-rate = number of jobs getting from queuing to running per hour in the PQ; it is evaluated according to jobs with starttime within last 6 hours
If the PQ does not meet any condition above, it can be throttled. Then compute nQ_max(PQ) (i.e. the affordable max queue length of each PQ), which is the max among the following values:
BASE_QUEUE_LENGTH_PER_PQ
STATIC_MAX_QUEUE_RUNNING_RATIO
* nR(PQ)MAX_EXPECTED_WAIT_HOUR
* trr(PQ)
Next, compute what percentage of nQ_max(PQ) is for the user:
percentage(PQ, user) = max( nR(PQ, user)/nR(PQ) , 1/nUsers(PQ) )
Finally, the max value among the following, called max_q_len , will be used to throttle nQ(PQ, user)
BASE_DEFAULT_QUEUE_LENGTH_PER_PQ_USER
BASE_QUEUE_RATIO_ON_PQ
* nR(PQ)nQ_max(PQ) * percentage(PQ, user)
Thus, if nQ(PQ, user) > max_q_len , the brokerage will temporarily exclude the PQs in which the user of the task already has enough queuing jobs
Parameters mentioned above:
BASE_DEFAULT_QUEUE_LENGTH_PER_PQ_USER
,BASE_EXPECTED_WAIT_HOUR_ON_PQ
,BASE_QUEUE_LENGTH_PER_PQ
,BASE_QUEUE_RATIO_ON_PQ
,MAX_EXPECTED_WAIT_HOUR
,STATIC_MAX_QUEUE_RUNNING_RATIO
are defined in gdpconfig
Finally, it calculates the brokerage weight for remaining candidates using the following formula.
Original basic weight
\[weight = \frac {running + 1} {(activated + assigned + starting + defined + 1)}\]The brokerage uses the largest one as the number of running jobs among the following numbers:
The actual number of running jobs at the queue, Rreal.
min(nBatchJob, 20) if Rreal < 20 and nBatchJob (the number of running+submitted batch workers at PQ) > Rreal. Mainly for bootstrap.
numSlots if it is set to a positive number for the queue to the proactive job assignment.
The number of starting jobs if numSlots is set to zero, which is typically useful for Harvester to fetch jobs when the number of available slots dynamically changes.
Currently original basic weight is used in brokerage.
New basic weight
New basic weight = \(W_s \cdot W_q\) , where
- \(W_s\)weight from site-class
For jobs in User Analysis and Express Analysis: \(W_s\) is shown in the table
Task Class
hi-site
mid-site
lo-site
S or A
\(N_j\)
\(\epsilon_1\)
\(\epsilon_2\)
B
\(\epsilon_2\)
\(N_j\)
\(\epsilon_1\)
C
\(\epsilon_2\)
\(\epsilon_1\)
\(N_j\)
where
\(N_j\): estimated number of jobs to submit in the brokerage cycle
\(\epsilon_1\), \(\epsilon_2\): some small positive numbers (not fixed) to avoid dead zero; \(\epsilon_1 > \epsilon_2\)
The purpose is to broker time-sensitive tasks to high-appropriateness analysis sites more, and vice versa.
(For task and site classification, see Site & Task Classification)
For other analysis shares (group shares, ART, etc.): \(W_s = N_j\) . (Task- and site-classification are NOT considered)
\(W_q\): weight from queue length
\[W_{q} = \frac { \text{max_q_len}(PQ, user) - \text{nQ}(PQ, user) - N_{j} / N_{PQ} } { ( \sum_{\text{candidate PQs}} ( \text{max_q_len}(PQ, user) - \text{nQ}(PQ, user) ) ) - N_{j} }\]where \(N_{PQ}\) = number of candidate PQs; \(\text{nQ}\) and \(\text{max_q_len}\) are defined in Analysis Stabilizer
The purpose is to control the queue length in the same way of Analysis Stabilizer.
New basic weight will soon be deployed to replace the original basic weight
If all queues are skipped due to the persistent issues, the brokerage tries to find candidates without the input data locality check. If all queues are still skipped, the task is pending for 20 min. Otherwise, the remaining candidates are sorted by weight, and the best 10 candidates are taken.