Deploying an Actor¶
- What is an actor? See more info in our documentation:
Basically a Tapis actor is a script, that lives in the cloud, and does something for you. It’s not for compute intensive jobs, that’s what apps are for, it’s designed to be quick, responsive, and lightweight.
We’re going to deploy an actor that will detect when a file is uploaded,
create a Tapis job.json
, and submit that job to our FastQC application.
Create an Upload Folder¶
Now we’ll create the uploads
folder on our compute system. After we deploy our actor,
any file that is uploaded here will be analyzed automatically by
our FastQC app! Let’s ssh into frontera and create the folder w/ bash:
ssh $USERNAME@frontera.tacc.utexas.edu
cdw
mkdir uploads
cd uploads
pwd
Make sure to copy this path, ex /work2/05369/urrutia/frontera/uploads
we’ll need it when we deploy the actor.
And go ahead and copy this fastq file into your uploads directory:
cp /work2/05369/urrutia/frontera/fastq/input.fastq $WORK/uploads
ls $WORK/uploads
Note
for those deploying mriqc it’ll be:
mkdir bids
rsync -rlt /work2/05369/urrutia/frontera/bids/ds001_BIDS $WORK/bids
Deploying an Actor¶
We can move the the fastqc_actor
directory in that same repository we downloaded earlier:
cd ../fastqc_actor
Edit reactor.py, job.json and build container¶
Reactor.py is the script that runs when an actor is invoked. We’ll edit the input directory here to match the input directory that you’ll be using to upload data:
1import os
2import json
3from tapipy import actors, util, errors
4from tapipy.tapis import Tapis
5from pathlib import Path
6import dataclasses
7from typing import Any
8
9JOB = Path("/opt/job.json")
10UPLOAD_DIR = '/work2/05369/urrutia/frontera/uploads'
11
12def main() -> None:
13 context = actors.get_context() # type: ignore
14 #print(json.dumps(context, indent=4))
15
16 # And we'll read in our job definition json to use for submitting a job
17 with open(JOB, "r") as f:
18 job = json.load(f)
19
20 # For people who don't have docker, you can optionally define path/appid in a message to the actor
21 upload_dir = context.message_dict.get('upload_dir')
22 if upload_dir is None:
23 upload_dir=os.path.normpath(UPLOAD_DIR)
24 else:
25 upload_dir = os.path.normpath(upload_dir)
26 appid = context.message_dict.get('appId')
27 if appid is not None:
28 job['appId'] = appid
29
30 # First we need to get our tapis client so we can interact with systems and submit jobs
31 t = actors.get_client()
32 # there is a bug in the base url, this is a simple patch to remove the trailing slash
33 t.base_url = t.base_url.strip('/')
34 # Lets check our upload directory to see all the files that have been uploaded
35 files = t.files.listFiles(systemId='frontera', path=upload_dir)
36 # files.listFiles returns tapipy files objects, we just need a list of paths
37 # so we can use list comprehension to create a list of file paths
38 input_files = [file.path for file in files]
39
40 # We'll use a "submitted.txt" file to keep track of inputs that have been submitted
41 # the first time you run this that file won't exist, so we'll wrap this step in a try/except
42 # so we can create that submitted list the first time the actor runs
43 try:
44 # Try reading in the contents of submitted.txt
45 submitted_file = t.files.getContents(systemId='frontera',path=upload_dir+'/submitted.txt')
46 # files.getContents returns a bytes object of the file contents,
47 # so we decode it and turn it into a list
48 submitted = [sub.decode() for sub in submitted_file.splitlines()]
49 except errors.NotFoundError as e:
50 # for our first run, we'll add "submitted.txt", so that new file doesn't trigger another job submission
51 submitted = [upload_dir.strip('/')+'/submitted.txt']
52
53 # Another example of list comprehension to create a list of files that are
54 # in the upload directory, but not in the "submitted.txt" file
55 unsubmitted = [path for path in input_files if path not in submitted]
56 # For any input that hasn't been submitted, lets create a new job
57 for path in unsubmitted:
58 print(path)
59 # sets the input for the job
60 job['parameterSet']['appArgs'][0]['arg'] = '/'+path
61 # set the archive location for the job
62 job['archiveSystemDir'] = upload_dir.replace('uploads','products')
63 # submits the job
64 resp = t.jobs.submitJob(**job)
65 # print the job uuid so we'll have it in the logs
66 print("Submitted Job: {}".format(resp.uuid))
67 # add the input to our "submitted" list
68 submitted.append(path)
69
70 # If this actor doesn't submit any jobs, we don't need to update "submitted.txt"
71 # and we can just close out
72 if unsubmitted == [] and len(submitted) > 1:
73 print("No new uploads detected")
74 exit()
75 # Save a local version of submitted.txt
76 with open(r'submitted.local', 'w') as fp:
77 for item in submitted:
78 # write each item on a new line
79 fp.write("%s\n" % item)
80 # Upload our local, updated version of submitted.txt to the upload dir so we don't resubmit the same job
81 t.upload(source_file_path="submitted.local", system_id="frontera", dest_file_path=upload_dir+"/submitted.txt")
82 print("updated submitted.txt")
83
84
85
86if __name__ == "__main__":
87 main()
And change the name of the app in job.json
, so it matches your app id:
1{
2 "name": "fastqc",
3 "appId": "urrutia-fastqc",
4 "appVersion": "3.0.0",
5 "archiveSystemDir": "${JobWorkingDir}",
6 "archiveOnAppError": false,
7 "parameterSet": {
8 "appArgs": [
9 {"name": "inputFastq", "arg": "/work2/05369/urrutia/frontera/inputs/input.fastq"}
10 ],
11 "schedulerOptions": [
12 {
13 "name": "TACC Allocation",
14 "arg": "-A Frontera-Training"
15 }
16 ]
17 },
18 "moduleLoads": [
19 {
20 "moduleLoadCommand": "module load",
21 "modulesToLoad": [
22 "tacc-apptainer"
23 ]
24 }
25 ]
26}
Build and push the actor:
docker build -t $USERNAME/fastqc_actor:0.0.1 .
docker push $USERNAME/fastqc_actor:0.0.1
# or on a M1 chip
docker buildx build --platform linux/amd64 -t $USERNAME/fastqc_actor:0.0.1 --push .
Deploy the Actor¶
All that’s left is to deploy our reactor:
import json
actor = {
"image": "$USERNAME/fastqc_actor:0.0.1",
"stateless": True,
"token": True,
"cron": True,
"cron_schedule": "now + 1 week"
}
t.actors.create_actor(**actor)
# you can also manually trigger an actor with:
# t.actors.send_message(actor_id='$ACTOR_ID', message={"test":"message"})
Warning
If you didn’t build the container, you can use the one I created above, but you’ll need to define upload_dir and appId in a message:
t.actors.send_message(actor_id='$ACTOR_ID', message={"upload_dir":"$PATH","appId":"$APPID"})
You should see a response like:
_links:
executions: https://tacc.tapis.io//v3/actors/B741py883o7Y8/executions
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
create_time: 2023-07-11T15:58:27.624476
cron_next_ex: None
cron_on: False
cron_schedule: None
default_environment:
description:
gid: None
hints: []
id: B741py883o7Y8
image: jurrutia/fastqc_actor:0.0.1
last_update_time: 2023-07-11T15:58:27.624476
link:
log_ex: None
max_cpus: None
max_workers: None
mem_limit: None
mounts: [
container_path: /home/tapis/runtime_files/_abaco_data1
host_path: /home/apim/prod/runtime_files/data1
mode: ro,
container_path: /home/tapis/runtime_files/_abaco_data2
host_path: /home/apim/prod/runtime_files/data2/tacc/urrutia
mode: rw]
name: None
owner: urrutia
privileged: False
queue: default
revision: 1
run_as_executor: False
state:
stateless: True
status: SUBMITTED
status_message:
tasdir: None
token: True
type: none
uid: None
use_container_uid: False
webhook:
Copy your actor id (B741py883o7Y8
in the above example).
If you forget the id, you can always list out your actors with t.actors.list_actors()
.
Test the Actor execution¶
Now the only thing left to do is to test and see if our
upload -> reactor -> app
chain is functioning.
Now that we’ve uploaded lets see if our actor was triggered:
t.actors.list_executions(actor_id='$ACTOR_ID')
The response should look like:
_links:
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
actor_id: 8qB5RA33VMlN6
api_server: https://tacc.tapis.io/
executions: [
finish_time: Wed, 12 Jul 2023 20:45:54 GMT
id: jeOJBJyObDWo
message_received_time: Wed, 12 Jul 2023 20:45:38 GMT
start_time: Wed, 12 Jul 2023 20:45:53 GMT
status: COMPLETE]
owner: urrutia
total_cpu: 88536648
total_executions: 1
total_io: 180
total_runtime: 1
If you want to see the logs from your actor execution you can run with one of the execution ids from the response above:
t.actors.get_execution_logs(actor_id='$ACTOR_ID', execution_id='$EXECUTION_ID')
For the first execution the logs should read:
_links:
execution: https://tacc.tapis.io//v3/actors/8qB5RA33VMlN6/executions/8zPjejRxNX3W1
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
logs: work2/05369/urrutia/frontera/uploads/input.fastq
Submitted Job: ea15ef53-cfe9-4191-9672-38cf5f7af970-007
updated submitted.txt
We can inspect or jobs record with:
t.jobs.getJob(jobUuid='$JOB_UUID')
And finally let’s check our products folder to see if we have the output we expect:
ls $WORK/products
Congratulations, you successfully automated part of your workflow with Tapis! But there is no reason to stop here, you can add a notification to your FastQC jobs to trigger a new reactor (and perform an alignment maybe?), and build an entirely automated workflow by chaining together reactors and apps.
Once you’re finished please remove the cron or delete your actor:
# update to remove cron
t.actors.update_actor(actor_id='$ACTOR_ID', image="$DOCKER_USERNAME/$REPO:$TAG", cron=False)
# Delete
t.actors.delete_actor(actor_id='$ACTOR_ID')