Skip to content

Commit d813f84

Browse files
authored
Merge pull request #8404 from chaen/v9.0_speed_moreFCSpeadup
feat (DFC Client): speedup getReplicas by sorting LFNs
2 parents 4db5684 + 26abc43 commit d813f84

File tree

3 files changed

+18
-17
lines changed

3 files changed

+18
-17
lines changed

src/DIRAC/Resources/Catalog/FileCatalogClient.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ def getReplicas(self, lfns, allStatus=False, timeout=120):
141141
successful = {}
142142
failed = {}
143143

144-
for chunk in breakListIntoChunks(lfns, GET_REPLICAS_CHUNK_SIZE):
144+
# We want to sort the lfns because of the way the server groups the db queries by
145+
# directory. So if we sort them, the grouping is more efficient.
146+
for chunk in breakListIntoChunks(sorted(lfns), GET_REPLICAS_CHUNK_SIZE):
145147
rpcClient = self._getRPC(timeout=timeout)
146148
result = rpcClient.getReplicas(chunk, allStatus)
147149

src/DIRAC/Resources/Catalog/Utilities.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
""" DIRAC FileCatalog client utilities
2-
"""
1+
"""DIRAC FileCatalog client utilities"""
2+
33
import os
44
import errno
55
import functools
@@ -16,7 +16,7 @@ def checkArgumentDict(path):
1616
"""Check and process format of the arguments to FileCatalog methods"""
1717
if isinstance(path, str):
1818
urls = {path: True}
19-
elif isinstance(path, list):
19+
elif isinstance(path, (list, set)):
2020
urls = {}
2121
for url in path:
2222
urls[url] = True

src/DIRAC/TransformationSystem/Agent/TransformationAgent.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -500,19 +500,18 @@ def __getDataReplicas(self, transDict, lfns, clients, forJobs=True):
500500
startTime = time.time()
501501
self._logInfo(f"Getting replicas for {len(newLFNs)} files from catalog", method=method, transID=transID)
502502
newReplicas = {}
503-
for chunk in breakListIntoChunks(newLFNs, 10000):
504-
res = self._getDataReplicasDM(transID, chunk, clients, forJobs=forJobs)
505-
if res["OK"]:
506-
reps = {lfn: ses for lfn, ses in res["Value"].items() if ses}
507-
newReplicas.update(reps)
508-
self.__updateCache(transID, reps)
509-
else:
510-
self._logWarn(
511-
f"Failed to get replicas for {len(chunk)} files",
512-
res["Message"],
513-
method=method,
514-
transID=transID,
515-
)
503+
res = self._getDataReplicasDM(transID, newLFNs, clients, forJobs=forJobs)
504+
if res["OK"]:
505+
newReplicas = {lfn: ses for lfn, ses in res["Value"].items() if ses}
506+
507+
self.__updateCache(transID, newReplicas)
508+
else:
509+
self._logWarn(
510+
f"Failed to get replicas for {len(newLFNs)} files",
511+
res["Message"],
512+
method=method,
513+
transID=transID,
514+
)
516515

517516
self._logInfo(
518517
f"Obtained {len(newReplicas)} replicas from catalog in {time.time() - startTime:.1f} seconds",

0 commit comments

Comments
 (0)