Deploying an Actor

What is an actor? See more info in our documentation:

Basically a Tapis actor is a script, that lives in the cloud, and does something for you. It’s not for compute intensive jobs, that’s what apps are for, it’s designed to be quick, responsive, and lightweight.

We’re going to deploy an actor that will detect when a file is uploaded, create a Tapis job.json, and submit that job to our FastQC application.

Create an Upload Folder

Now we’ll create the uploads folder on our compute system. After we deploy our actor, any file that is uploaded here will be analyzed automatically by our FastQC app! Let’s ssh into frontera and create the folder w/ bash:

ssh $USERNAME@frontera.tacc.utexas.edu
cdw
mkdir uploads
cd uploads
pwd

Make sure to copy this path, ex /work2/05369/urrutia/frontera/uploads we’ll need it when we deploy the actor.

And go ahead and copy this fastq file into your uploads directory:

cp /work2/05369/urrutia/frontera/fastq/input.fastq $WORK/uploads
ls $WORK/uploads

Note

for those deploying mriqc it’ll be:

mkdir bids
rsync -rlt /work2/05369/urrutia/frontera/bids/ds001_BIDS $WORK/bids

Deploying an Actor

We can move the the fastqc_actor directory in that same repository we downloaded earlier:

cd ../fastqc_actor

Edit reactor.py, job.json and build container

Reactor.py is the script that runs when an actor is invoked. We’ll edit the input directory here to match the input directory that you’ll be using to upload data:

 1import os
 2import json
 3from tapipy import actors, util, errors
 4from tapipy.tapis import Tapis
 5from pathlib import Path
 6import dataclasses
 7from typing import Any
 8
 9JOB = Path("/opt/job.json")
10UPLOAD_DIR = '/work2/05369/urrutia/frontera/uploads'
11
12def main() -> None:
13    context = actors.get_context()  # type: ignore
14    #print(json.dumps(context, indent=4))
15
16    # And we'll read in our job definition json to use for submitting a job
17    with open(JOB, "r") as f:
18        job = json.load(f)
19
20    # For people who don't have docker, you can optionally define path/appid in a message to the actor
21    upload_dir = context.message_dict.get('upload_dir')
22    if upload_dir is None:
23        upload_dir=os.path.normpath(UPLOAD_DIR)
24    else:
25        upload_dir = os.path.normpath(upload_dir)
26    appid = context.message_dict.get('appId')
27    if appid is not None:
28        job['appId'] = appid
29
30    # First we need to get our tapis client so we can interact with systems and submit jobs
31    t = actors.get_client()
32    # there is a bug in the base url, this is a simple patch to remove the trailing slash
33    t.base_url = t.base_url.strip('/')
34    # Lets check our upload directory to see all the files that have been uploaded
35    files = t.files.listFiles(systemId='frontera', path=upload_dir)
36    # files.listFiles returns tapipy files objects, we just need a list of paths
37    # so we can use list comprehension to create a list of file paths
38    input_files = [file.path for file in files]
39
40    # We'll use a "submitted.txt" file to keep track of inputs that have been submitted
41    # the first time you run this that file won't exist, so we'll wrap this step in a try/except
42    # so we can create that submitted list the first time the actor runs
43    try:
44        # Try reading in the contents of submitted.txt
45        submitted_file = t.files.getContents(systemId='frontera',path=upload_dir+'/submitted.txt')
46        # files.getContents returns a bytes object of the file contents, 
47        # so we decode it and turn it into a list
48        submitted = [sub.decode() for sub in submitted_file.splitlines()]
49    except errors.NotFoundError as e:
50        # for our first run, we'll add "submitted.txt", so that new file doesn't trigger another job submission
51        submitted = [upload_dir.strip('/')+'/submitted.txt']
52
53    # Another example of list comprehension to create a list of files that are 
54    # in the upload directory, but not in the "submitted.txt" file
55    unsubmitted = [path for path in input_files if path not in submitted]
56    # For any input that hasn't been submitted, lets create a new job
57    for path in unsubmitted:
58        print(path)
59        # sets the input for the job
60        job['parameterSet']['appArgs'][0]['arg'] = '/'+path
61        # set the archive location for the job
62        job['archiveSystemDir'] = upload_dir.replace('uploads','products')
63        # submits the job
64        resp = t.jobs.submitJob(**job)
65        # print the job uuid so we'll have it in the logs
66        print("Submitted Job: {}".format(resp.uuid))
67        # add the input to our "submitted" list
68        submitted.append(path)
69
70    # If this actor doesn't submit any jobs, we don't need to update "submitted.txt"
71    # and we can just close out
72    if unsubmitted == [] and len(submitted) > 1:
73        print("No new uploads detected")
74        exit()
75    # Save a local version of submitted.txt
76    with open(r'submitted.local', 'w') as fp:
77        for item in submitted:
78            # write each item on a new line
79            fp.write("%s\n" % item)
80    # Upload our local, updated version of submitted.txt to the upload dir so we don't resubmit the same job
81    t.upload(source_file_path="submitted.local", system_id="frontera", dest_file_path=upload_dir+"/submitted.txt")
82    print("updated submitted.txt")
83
84
85
86if __name__ == "__main__":
87    main()

And change the name of the app in job.json, so it matches your app id:

 1{
 2    "name": "fastqc",
 3    "appId": "urrutia-fastqc",
 4    "appVersion": "3.0.0",
 5    "archiveSystemDir": "${JobWorkingDir}",
 6    "archiveOnAppError": false,
 7    "parameterSet": {
 8        "appArgs": [
 9            {"name": "inputFastq", "arg": "/work2/05369/urrutia/frontera/inputs/input.fastq"}
10        ],
11        "schedulerOptions": [
12            {
13                "name": "TACC Allocation",
14                "arg": "-A Frontera-Training"
15            }
16        ]
17    },
18    "moduleLoads": [
19        {
20            "moduleLoadCommand": "module load",
21            "modulesToLoad": [
22                "tacc-apptainer"
23            ]
24        }
25    ]
26}

Build and push the actor:

docker build -t $USERNAME/fastqc_actor:0.0.1 .
docker push $USERNAME/fastqc_actor:0.0.1
# or on a M1 chip
docker buildx build --platform linux/amd64 -t $USERNAME/fastqc_actor:0.0.1 --push .

Deploy the Actor

All that’s left is to deploy our reactor:

import json
actor = {
  "image": "$USERNAME/fastqc_actor:0.0.1",
  "stateless": True,
  "token": True,
  "cron": True,
  "cron_schedule": "now + 1 week"
}
t.actors.create_actor(**actor)

# you can also manually trigger an actor with:
# t.actors.send_message(actor_id='$ACTOR_ID', message={"test":"message"})

Warning

If you didn’t build the container, you can use the one I created above, but you’ll need to define upload_dir and appId in a message:

t.actors.send_message(actor_id='$ACTOR_ID', message={"upload_dir":"$PATH","appId":"$APPID"})

You should see a response like:

_links:
executions: https://tacc.tapis.io//v3/actors/B741py883o7Y8/executions
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
create_time: 2023-07-11T15:58:27.624476
cron_next_ex: None
cron_on: False
cron_schedule: None
default_environment:

description:
gid: None
hints: []
id: B741py883o7Y8
image: jurrutia/fastqc_actor:0.0.1
last_update_time: 2023-07-11T15:58:27.624476
link:
log_ex: None
max_cpus: None
max_workers: None
mem_limit: None
mounts: [
container_path: /home/tapis/runtime_files/_abaco_data1
host_path: /home/apim/prod/runtime_files/data1
mode: ro,
container_path: /home/tapis/runtime_files/_abaco_data2
host_path: /home/apim/prod/runtime_files/data2/tacc/urrutia
mode: rw]
name: None
owner: urrutia
privileged: False
queue: default
revision: 1
run_as_executor: False
state:

stateless: True
status: SUBMITTED
status_message:
tasdir: None
token: True
type: none
uid: None
use_container_uid: False
webhook:

Copy your actor id (B741py883o7Y8 in the above example). If you forget the id, you can always list out your actors with t.actors.list_actors().

Test the Actor execution

Now the only thing left to do is to test and see if our upload -> reactor -> app chain is functioning.

Now that we’ve uploaded lets see if our actor was triggered:

t.actors.list_executions(actor_id='$ACTOR_ID')

The response should look like:

_links:
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
actor_id: 8qB5RA33VMlN6
api_server: https://tacc.tapis.io/
executions: [
finish_time: Wed, 12 Jul 2023 20:45:54 GMT
id: jeOJBJyObDWo
message_received_time: Wed, 12 Jul 2023 20:45:38 GMT
start_time: Wed, 12 Jul 2023 20:45:53 GMT
status: COMPLETE]
owner: urrutia
total_cpu: 88536648
total_executions: 1
total_io: 180
total_runtime: 1

If you want to see the logs from your actor execution you can run with one of the execution ids from the response above:

t.actors.get_execution_logs(actor_id='$ACTOR_ID', execution_id='$EXECUTION_ID')

For the first execution the logs should read:

_links:
execution: https://tacc.tapis.io//v3/actors/8qB5RA33VMlN6/executions/8zPjejRxNX3W1
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
logs: work2/05369/urrutia/frontera/uploads/input.fastq
Submitted Job: ea15ef53-cfe9-4191-9672-38cf5f7af970-007
updated submitted.txt

We can inspect or jobs record with:

t.jobs.getJob(jobUuid='$JOB_UUID')

And finally let’s check our products folder to see if we have the output we expect:

ls $WORK/products

Congratulations, you successfully automated part of your workflow with Tapis! But there is no reason to stop here, you can add a notification to your FastQC jobs to trigger a new reactor (and perform an alignment maybe?), and build an entirely automated workflow by chaining together reactors and apps.

Once you’re finished please remove the cron or delete your actor:

# update to remove cron
t.actors.update_actor(actor_id='$ACTOR_ID', image="$DOCKER_USERNAME/$REPO:$TAG", cron=False)
# Delete
t.actors.delete_actor(actor_id='$ACTOR_ID')