public class TransferEngine extends Engine
Modifier and Type | Field and Description |
---|---|
static int |
DELETED_JOBS_LEVEL
The MAX level is assigned as the level for deleted jobs.
|
static String |
FILE_URL_SCHEME
The scheme name for file url.
|
private ADag |
mDag
The DAG object to which the transfer nodes are to be added.
|
protected boolean |
mDeepStorageStructure
A boolean indicating whether to have a deep directory structure for
the storage directory or not.
|
private List |
mDeletedJobs
Holds all the jobs deleted by the reduction algorithm.
|
private List |
mDeletedLeafJobs
Holds the jobs from the original dags which are deleted by the reduction
algorithm.
|
private org.griphyn.vdl.euryale.FileFactory |
mFactory
The handle to the file factory, that is used to create the top level
directories for each of the partitions.
|
private ReplicaCatalogBridge |
mRCBridge
The bridge to the Replica Catalog.
|
private ReplicaSelector |
mReplicaSelector
The handle to the replica selector that is to used to select the various
replicas.
|
private SLS |
mSLS
The handle to the SLS implementor
|
private Map<String,NameValue> |
mSRMServiceURLToMountPointMap
A map that associates the site name with the SRM server url and mount point.
|
private String |
mStageOutBaseDirectory
The base path for the stageout directory on the output site where all
the files are staged out.
|
protected String |
mStorageDir
This contains the storage directory relative to the se mount point of the
pool.
|
private ReplicaCatalog |
mTransientRC
A SimpleFile Replica Catalog, that tracks all the files that are being
materialized as part of workflow executaion.
|
private Refiner |
mTXRefiner
The handle to the transfer refiner that adds the transfer nodes into the
workflow.
|
protected boolean |
mUseSymLinks
This member variable if set causes the source url for the pull nodes from
the RLS to have file:// url if the pool attributed associated with the pfn
is same as a particular jobs execution pool.
|
protected String |
mWorkDir
The working directory relative to the mount point of the execution pool.
|
private boolean |
mWorkerNodeExecution
A boolean indicating whether we are doing worker node execution or not.
|
static String |
SRM_MOUNT_POINT_PROPERTIES_SUFFIX
The suffix to retrive the mount point for SRM server.
|
static String |
SRM_PROPERTIES_PREFIX
The property prefix for retrieving SRM properties.
|
static String |
SRM_SERVICE_URL_PROPERTIES_SUFFIX
The suffix to retrive the service url for SRM server.
|
static String |
SYMLINK_URL_SCHEME
The scheme name for file url.
|
mBag, mLogger, mLogMsg, mOutputPool, mPoolFile, mPOptions, mProps, mRLIUrl, mSiteStore, mTCFile, mTCHandle, mTCMode, REGISTRATION_UNIVERSE, TRANSFER_UNIVERSE
Constructor and Description |
---|
TransferEngine(ADag reducedDag,
PegasusBag bag,
List<Job> deletedJobs,
List<Job> deletedLeafJobs)
Overloaded constructor.
|
Modifier and Type | Method and Description |
---|---|
void |
addTransferNodes(ReplicaCatalogBridge rcb,
ReplicaCatalog transientCatalog)
Adds the transfer nodes to the workflow.
|
private FileTransfer |
constructFileTX(PegasusFile pf,
String stagingSiteHandle,
String destSiteHandle,
String job,
String path,
boolean localTransfer)
Constructs the FileTransfer object on the basis of the transiency
information.
|
private Map<String,NameValue> |
constructSiteToSRMServerMap(PegasusProperties props)
Constructs a Properties objects by parsing the relevant SRM
pegasus properties.
|
private Vector |
getDeletedFileTX(String pool,
Job job)
This gets the file transfer objects corresponding to the location of files
found in the replica mechanism, and transfers it to the output pool asked
by the user.
|
private void |
getFilesFromRC(DAGJob job,
Collection searchFiles)
Special Handling for a DAGJob for retrieving files from the Replica Catalog.
|
private void |
getFilesFromRC(DAXJob job,
Collection searchFiles)
Special Handling for a DAXJob for retrieving files from the Replica Catalog.
|
private void |
getFilesFromRC(Job job,
Collection searchFiles)
It looks up the RCEngine Hashtable to lookup the locations for the
files and add nodes to transfer them.
|
private Vector |
getFileTX(String destPool,
Job job,
boolean localTransfer)
This gets the Vector of FileTransfer objects for the files which have to
be transferred to an one destination pool.
|
private Collection<FileTransfer>[] |
getInterpoolFileTX(Job job,
Vector nodes)
This gets the Vector of FileTransfer objects for all the files which have
to be transferred to the destination pool in case of Interpool transfers.
|
private Set |
getOutputFiles(Vector nodes,
Vector parentSubs)
It gets the output files for all the nodes which are specified in
the Vector nodes passed.
|
protected String |
getPathOnStageoutSite(String lfn)
Returns the full path on remote output site, where the lfn will reside.
|
String |
getStagingSite(Job job)
Returns the staging site to be used for a job.
|
private Job |
getSubInfo(String jobName)
Returns the Job object for the job specified.
|
protected void |
initializeStageOutSiteDirectoryFactory(ADag workflow)
Initialize the Stageout Site Directory factory.
|
private String |
poolNotFoundMsg(String poolName,
String universe)
This generates a error message for pool not found in the pool
config file.
|
private void |
processParents(Job job,
Vector vParents)
It processes a nodes parents and determines if nodes are to be added
or not.
|
protected String |
replaceProtocolFromURL(String pfn)
Replaces the gsiftp URL scheme from the url, and replaces it with the
symlink url scheme and returns in a new object.
|
protected ReplicaCatalogEntry |
replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
Replaces the SRM URL scheme from the url, and replaces it with the
file url scheme and returns in a new object if replacement happens.
|
boolean |
runTransferOnLocalSite(String site,
String destinationURL,
int type)
Returns whether to run a transfer job on local site or not.
|
protected void |
trackInTransientRC(Job job)
Tracks the files created by a job in the Transient Replica Catalog.
|
private void |
trackInTransientRC(String lfn,
String pfn,
String site)
Inserts an entry into the Transient RC.
|
private void |
trackInTransientRC(String lfn,
String pfn,
String site,
boolean modifyURL)
Inserts an entry into the Transient RC.
|
addVector, appendArrayList, loadProperties, printVector, stringInList, stringInPegVector, stringInVector, vectorToString
public static final int DELETED_JOBS_LEVEL
public static final String FILE_URL_SCHEME
public static final String SYMLINK_URL_SCHEME
public static final String SRM_PROPERTIES_PREFIX
public static final String SRM_SERVICE_URL_PROPERTIES_SUFFIX
public static final String SRM_MOUNT_POINT_PROPERTIES_SUFFIX
private Map<String,NameValue> mSRMServiceURLToMountPointMap
private ADag mDag
private ReplicaCatalogBridge mRCBridge
private ReplicaSelector mReplicaSelector
private Refiner mTXRefiner
private List mDeletedJobs
private List mDeletedLeafJobs
private ReplicaCatalog mTransientRC
private org.griphyn.vdl.euryale.FileFactory mFactory
private String mStageOutBaseDirectory
protected String mWorkDir
protected String mStorageDir
protected boolean mDeepStorageStructure
protected boolean mUseSymLinks
private boolean mWorkerNodeExecution
private SLS mSLS
public TransferEngine(ADag reducedDag, PegasusBag bag, List<Job> deletedJobs, List<Job> deletedLeafJobs)
reducedDag
- the reduced workflow.bag
- bag of initialization objectsdeletedJobs
- list of all jobs deleted by reduction algorithm.deletedLeafJobs
- list of deleted leaf jobs by reduction algorithm.public boolean runTransferOnLocalSite(String site, String destinationURL, int type)
site
- the site handle associated with the destination URL.destURL
- the destination URLtype
- the type of transfer job for which the URL is being constructed.private Job getSubInfo(String jobName)
jobName
- the name of the jobpublic void addTransferNodes(ReplicaCatalogBridge rcb, ReplicaCatalog transientCatalog)
rcb
- the bridge to the ReplicaCatalog.transientCatalog
- an instance of the replica catalog that will
store the locations of the files on the remote
sites.public String getStagingSite(Job job)
job
- the job for which to determine the staging siteprivate Vector getDeletedFileTX(String pool, Job job)
pool
- this the output pool which the user specifies at runtime.job
- The Job object corresponding to the leaf job which was
deleted by the Reduction algorithmFileTransfer
objectsprivate void processParents(Job job, Vector vParents)
job
- the Job
object containing all the
details of the job.vParents
- Vector of String objects corresponding to the Parents
of the node.private Vector getFileTX(String destPool, Job job, boolean localTransfer)
destSiteHandle
- The pool to which the files are to be transferred to.job
- The Job
object of the job whose output files
are needed at the destination pool.localTransfer
- boolean indicating that associated transfer job will run
on local site.FileTransfer
objectsprivate FileTransfer constructFileTX(PegasusFile pf, String stagingSiteHandle, String destSiteHandle, String job, String path, boolean localTransfer)
pf
- the PegasusFile for which the transfer has to be done.stagingSiteHandle
- the staging site at which file is placed after execution.destSiteHandle
- the output pool where the job should be transferredjob
- the name of the associated job.path
- the path that a user specifies in the profile for key
remote_initialdir that results in the workdir being
changed for a job on a execution pool.localTransfer
- boolean indicating that associated transfer job will run
on local site.private String poolNotFoundMsg(String poolName, String universe)
poolName
- the name of pool that is not found.universe
- the condor universeprivate Collection<FileTransfer>[] getInterpoolFileTX(Job job, Vector nodes)
job
- the job with reference to which interpool file transfers
need to be determined.nodes
- Vector of Job
objects for the nodes, whose
outputfiles are to be transferred to the dest pool.FileTransfer
objectsprivate void getFilesFromRC(DAGJob job, Collection searchFiles)
job
- the DAGJobsearchFiles
- file that need to be looked in the Replica Catalog.private void getFilesFromRC(DAXJob job, Collection searchFiles)
job
- the DAXJobsearchFiles
- file that need to be looked in the Replica Catalog.private void getFilesFromRC(Job job, Collection searchFiles)
job
- the Job
object for whose ipfile have
to search the Replica Mechanism for.searchFiles
- Vector containing the PegasusFile objects corresponding
to the files that need to have their mapping looked
up from the Replica Mechanism.protected ReplicaCatalogEntry replaceSourceProtocolFromURL(ReplicaCatalogEntry rce)
rce
- the ReplicaCatalogEntry
object whose url need to be
replaced.protected String replaceProtocolFromURL(String pfn)
pfn
- the pfn that needs to be replacedprivate Map<String,NameValue> constructSiteToSRMServerMap(PegasusProperties props)
pegasus.transfer.srm.ligo-cit.service.url srm://osg-se.ligo.caltech.edu:10443/srm/v2/server?SFN=/mnt/hadoop pegasus.transfer.srm.ligo-cit.service.mountpoint /mnt/hadoopthen, a Map is create the associates ligo-cit with NameValue object containing the service url and mount point ( ).
props
- the PegasusProperties
objectprivate Set getOutputFiles(Vector nodes, Vector parentSubs)
nodes
- Vector of nodes job names whose output files are required.parentSubs
- Vector of Job
objects. One passes an
empty vector as a parameter. And this populated with
Job objects, of the nodes when output files are
being determined.protected String getPathOnStageoutSite(String lfn)
lfn
- the logical filename of the file.protected void initializeStageOutSiteDirectoryFactory(ADag workflow)
workflow
- the workflow to which the transfer nodes need to be
added.protected void trackInTransientRC(Job job)
job
- the job whose input files need to be tracked.private void trackInTransientRC(String lfn, String pfn, String site)
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handleprivate void trackInTransientRC(String lfn, String pfn, String site, boolean modifyURL)
lfn
- the logical name of the file.pfn
- the pfnsite
- the site handlemodifyURL
- whether to modify URL in case of S3 or not.Copyright © 2011 The University of Southern California. All Rights Reserved.