Reproducible Science - Automation

Tutorial on creating a reproducible python package.

Introduction

This tutorial will teach you how to automate your data analysis using TAPIS.

Module Learning Objectives

In this example, we will show you how to setup automated analysis that is triggered when a file is uploaded to TACC.

Participants are strongly encouraged to follow along on the command line. After completing this module, participants should be able to:

  • Register your credentials to a TAPIS system

  • Build and deploy an Application

  • Trigger an Actor on a regular schedule

  • Use an Actor to submit an job to an Application

And we’re going to work backwards, first creating the app, then the actor, and finally uploading a file to submit a job.

Why is this important?

As you develop your computational skills, you will find that these skill are in high demand. Basic operations like moving files, interpreting metadata, initiating scripts, and formatting outputs will take up and inordinate amount of your time and are boring. If you can standardize your process for data ingest, you can automate the boring parts of your work. And can instead devote more time to interpreting your analysis and working on a new, improved version of your pipeline. Moreover, automating analysis will standardize the processing of your data, so in 6 months from now when your computational results have been verified experimentally you can go look back at what version of the application was run, what the parameters were, and write your methods section accordingly. Instead of having to guess or remember what you did, you can just check the records.

Don’t underestimate the time-saving value of automation! Check out this informative chart from XKCD

https://imgs.xkcd.com/comics/is_it_worth_the_time.png

This module is about 90 minutes, so if this process shaves 30 seconds off of something you do once a week, or shaves 30 minutes off something you do once a year, then it’s worth the time investment!

Polling Vs Event-Driven Automation

The two main automation paradigms are polling and event-driven automation. Polling queries the state at regular intervals, providing consistency and ease of monitoring. Polling is particularly useful when dealing with systems that do not have built-in event mechanisms. However, it may unnecessarily consume resources and is less responsive than an event-driven pipeline. On the other hand, event-driven automation allows for instant responses to specific events, minimizing latency and allowing for real-time processing. This makes event-driven systems highly efficient for handling time-sensitive or critical tasks. However, event-driven systems typically have a more involved setup process, and logging/debuging might be difficult for end users that don’t have administrative access to the event handler. Additionally, event-driven systems require careful design and management to ensure proper event handling and maintain system integrity.

In summary, polling provides a consistent and reliable approach to data retrieval, while event-driven automation pipelines offer real-time responsiveness and adaptability. Choosing between the two depends on the specific requirements of the automation task at hand, considering factors such as data frequency, system characteristics, and the importance of real-time processing.

In this example we use a polling-based automation system, but if you’re interested in seeing an event-driven example, see this guide (which uses Tapis V2):

Requirements

Creating an Application

Building A Container

We’re going to take some of what we’ve learned from best practices and put it into, well, practice. To deploy an app, we’ll build a docker container, push it to dockerhub, publish an app definition to TAPIS.

Clone this repo locally to get a copy of the app and actor we’ll be deploying:

git clone https://github.com/JoshuaUrrutia/automation_resources_2023.git
cd automation_resources_2023/fastqc_app

Now we can build and push a container that includes fastqc:

docker build -t $USERNAME/fastqc_app:3.0.0 .
docker push $USERNAME/fastqc_app:3.0.0
# or on a M1 chip
docker buildx build --platform linux/amd64 -t $USERNAME/fastqc_app:3.0.0 --push .

Setup Tapipy Client and Push Credentials to Execution System

If you haven’t already, let’s go ahead and install the tapipy on your local machine:

pip install tapipy

And the open up python and instantiate a client:

# Import the Tapis object
from tapipy.tapis import Tapis

# Log into you the Tapis service by providing user/pass and the base url of your tenant. For example, to interact with the tacc tenant --
t = Tapis(base_url='https://tacc.tapis.io',
          username='myuser',
          password='mypass')

# Get tokens that will be used for authentication function calls
t.get_tokens()

# Push your credentials to the public "frontera" system
t.systems.createUserCredential(systemId='frontera', userName='username', password='password')

To check Tapis is setup correctly, you can run:

t.systems.getSystems()
t.files.listFiles(systemId='frontera', path='.')

and you should see the frontera system, and files present in the root directory.

Deploy an Application

First lets update the application ID and the name of the container you just built. If you don’t want to build the container, you can just use the one that I built.

 1{
 2  "id": "urrutia-fastqc",
 3  "version": "3.0.0",
 4  "jobType": "BATCH",
 5  "runtime": "SINGULARITY",
 6  "runtimeOptions": [
 7    "SINGULARITY_RUN"
 8    ],
 9  "description": "A quality control tool for high throughput sequence data.",
10  "containerImage": "docker://jurrutia/fastqc_app:3.0.0",
11  "jobAttributes": {
12      "isMpi": false,
13      "archiveOnAppError": false,
14      "execSystemId": "frontera",
15      "execSystemLogicalQueue": "small",
16      "execSystemExecDir": "${JobWorkingDir}",
17      "execSystemInputDir": "${JobWorkingDir}",
18      "execSystemOutputDir": "${JobWorkingDir}",
19      "archiveSystemId": "frontera",
20      "archiveSystemDir": "HOST_EVAL($WORK)/jobs/",
21      "parameterSet": {
22          "appArgs": [
23              {"name": "extractFlag", "arg": "--extract ", "inputMode": "FIXED"},
24              {"name": "threads", "arg": "--threads 80", "inputMode": "FIXED"},
25              {"name": "outdir", "arg": "--outdir .", "inputMode": "FIXED"},
26              {"name": "inputFastq", "arg": "path/to/fastq", "inputMode": "REQUIRED"}
27          ] 
28      },
29      "fileInputs": [
30      ]
31  }
32}

Now we have our container and our Tapis app definition, we’re ready to register the app with Tapis.

import json
# read our app definition into python as a dictionary
with open('app.json', 'r') as openfile:
  app_def = json.load(openfile)
# Deploy the App
t.apps.createAppVersion(**app_def)
# In the future if you need to update and redeploy the app, you can use the .patchApp method
#t.apps.patchApp(appId=app_def['id'],appVersion=app_def['version'],jobAttributes=app_def['jobAttributes'])

If everything goes smoothly you should get a printout of your app url, ex:

url: http://tacc.tapis.io/v3/apps/urrutia-fastqc/3.0.0

Congratulations, you’ve deployed a Tapis application!

Deploying an Actor

What is an actor? See more info in our documentation:

Basically a Tapis actor is a script, that lives in the cloud, and does something for you. It’s not for compute intensive jobs, that’s what apps are for, it’s designed to be quick, responsive, and lightweight.

We’re going to deploy an actor that will detect when a file is uploaded, create a Tapis job.json, and submit that job to our FastQC application.

Create an Upload Folder

Now we’ll create the uploads folder on our compute system. After we deploy our actor, any file that is uploaded here will be analyzed automatically by our FastQC app! Let’s ssh into frontera and create the folder w/ bash:

ssh $USERNAME@frontera.tacc.utexas.edu
cdw
mkdir uploads
cd uploads
pwd

Make sure to copy this path, ex /work2/05369/urrutia/frontera/uploads we’ll need it when we deploy the actor.

And go ahead and copy this fastq file into your uploads directory:

cp /work2/05369/urrutia/frontera/fastq/input.fastq $WORK/uploads
ls $WORK/uploads

Note

for those deploying mriqc it’ll be:

mkdir bids
rsync -rlt /work2/05369/urrutia/frontera/bids/ds001_BIDS $WORK/bids

Deploying an Actor

We can move the the fastqc_actor directory in that same repository we downloaded earlier:

cd ../fastqc_actor

Edit reactor.py, job.json and build container

Reactor.py is the script that runs when an actor is invoked. We’ll edit the input directory here to match the input directory that you’ll be using to upload data:

 1import os
 2import json
 3from tapipy import actors, util, errors
 4from tapipy.tapis import Tapis
 5from pathlib import Path
 6import dataclasses
 7from typing import Any
 8
 9JOB = Path("/opt/job.json")
10UPLOAD_DIR = '/work2/05369/urrutia/frontera/uploads'
11
12def main() -> None:
13    context = actors.get_context()  # type: ignore
14    #print(json.dumps(context, indent=4))
15
16    # And we'll read in our job definition json to use for submitting a job
17    with open(JOB, "r") as f:
18        job = json.load(f)
19
20    # For people who don't have docker, you can optionally define path/appid in a message to the actor
21    upload_dir = context.message_dict.get('upload_dir')
22    if upload_dir is None:
23        upload_dir=os.path.normpath(UPLOAD_DIR)
24    else:
25        upload_dir = os.path.normpath(upload_dir)
26    appid = context.message_dict.get('appId')
27    if appid is not None:
28        job['appId'] = appid
29
30    # First we need to get our tapis client so we can interact with systems and submit jobs
31    t = actors.get_client()
32    # there is a bug in the base url, this is a simple patch to remove the trailing slash
33    t.base_url = t.base_url.strip('/')
34    # Lets check our upload directory to see all the files that have been uploaded
35    files = t.files.listFiles(systemId='frontera', path=upload_dir)
36    # files.listFiles returns tapipy files objects, we just need a list of paths
37    # so we can use list comprehension to create a list of file paths
38    input_files = [file.path for file in files]
39
40    # We'll use a "submitted.txt" file to keep track of inputs that have been submitted
41    # the first time you run this that file won't exist, so we'll wrap this step in a try/except
42    # so we can create that submitted list the first time the actor runs
43    try:
44        # Try reading in the contents of submitted.txt
45        submitted_file = t.files.getContents(systemId='frontera',path=upload_dir+'/submitted.txt')
46        # files.getContents returns a bytes object of the file contents, 
47        # so we decode it and turn it into a list
48        submitted = [sub.decode() for sub in submitted_file.splitlines()]
49    except errors.NotFoundError as e:
50        # for our first run, we'll add "submitted.txt", so that new file doesn't trigger another job submission
51        submitted = [upload_dir.strip('/')+'/submitted.txt']
52
53    # Another example of list comprehension to create a list of files that are 
54    # in the upload directory, but not in the "submitted.txt" file
55    unsubmitted = [path for path in input_files if path not in submitted]
56    # For any input that hasn't been submitted, lets create a new job
57    for path in unsubmitted:
58        print(path)
59        # sets the input for the job
60        job['parameterSet']['appArgs'][0]['arg'] = '/'+path
61        # set the archive location for the job
62        job['archiveSystemDir'] = upload_dir.replace('uploads','products')
63        # submits the job
64        resp = t.jobs.submitJob(**job)
65        # print the job uuid so we'll have it in the logs
66        print("Submitted Job: {}".format(resp.uuid))
67        # add the input to our "submitted" list
68        submitted.append(path)
69
70    # If this actor doesn't submit any jobs, we don't need to update "submitted.txt"
71    # and we can just close out
72    if unsubmitted == [] and len(submitted) > 1:
73        print("No new uploads detected")
74        exit()
75    # Save a local version of submitted.txt
76    with open(r'submitted.local', 'w') as fp:
77        for item in submitted:
78            # write each item on a new line
79            fp.write("%s\n" % item)
80    # Upload our local, updated version of submitted.txt to the upload dir so we don't resubmit the same job
81    t.upload(source_file_path="submitted.local", system_id="frontera", dest_file_path=upload_dir+"/submitted.txt")
82    print("updated submitted.txt")
83
84
85
86if __name__ == "__main__":
87    main()

And change the name of the app in job.json, so it matches your app id:

 1{
 2    "name": "fastqc",
 3    "appId": "urrutia-fastqc",
 4    "appVersion": "3.0.0",
 5    "archiveSystemDir": "${JobWorkingDir}",
 6    "archiveOnAppError": false,
 7    "parameterSet": {
 8        "appArgs": [
 9            {"name": "inputFastq", "arg": "/work2/05369/urrutia/frontera/inputs/input.fastq"}
10        ],
11        "schedulerOptions": [
12            {
13                "name": "TACC Allocation",
14                "arg": "-A Frontera-Training"
15            }
16        ]
17    },
18    "moduleLoads": [
19        {
20            "moduleLoadCommand": "module load",
21            "modulesToLoad": [
22                "tacc-apptainer"
23            ]
24        }
25    ]
26}

Build and push the actor:

docker build -t $USERNAME/fastqc_actor:0.0.1 .
docker push $USERNAME/fastqc_actor:0.0.1
# or on a M1 chip
docker buildx build --platform linux/amd64 -t $USERNAME/fastqc_actor:0.0.1 --push .

Deploy the Actor

All that’s left is to deploy our reactor:

import json
actor = {
  "image": "$USERNAME/fastqc_actor:0.0.1",
  "stateless": True,
  "token": True,
  "cron": True,
  "cron_schedule": "now + 1 week"
}
t.actors.create_actor(**actor)

# you can also manually trigger an actor with:
# t.actors.send_message(actor_id='$ACTOR_ID', message={"test":"message"})

Warning

If you didn’t build the container, you can use the one I created above, but you’ll need to define upload_dir and appId in a message:

t.actors.send_message(actor_id='$ACTOR_ID', message={"upload_dir":"$PATH","appId":"$APPID"})

You should see a response like:

_links:
executions: https://tacc.tapis.io//v3/actors/B741py883o7Y8/executions
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
create_time: 2023-07-11T15:58:27.624476
cron_next_ex: None
cron_on: False
cron_schedule: None
default_environment:

description:
gid: None
hints: []
id: B741py883o7Y8
image: jurrutia/fastqc_actor:0.0.1
last_update_time: 2023-07-11T15:58:27.624476
link:
log_ex: None
max_cpus: None
max_workers: None
mem_limit: None
mounts: [
container_path: /home/tapis/runtime_files/_abaco_data1
host_path: /home/apim/prod/runtime_files/data1
mode: ro,
container_path: /home/tapis/runtime_files/_abaco_data2
host_path: /home/apim/prod/runtime_files/data2/tacc/urrutia
mode: rw]
name: None
owner: urrutia
privileged: False
queue: default
revision: 1
run_as_executor: False
state:

stateless: True
status: SUBMITTED
status_message:
tasdir: None
token: True
type: none
uid: None
use_container_uid: False
webhook:

Copy your actor id (B741py883o7Y8 in the above example). If you forget the id, you can always list out your actors with t.actors.list_actors().

Test the Actor execution

Now the only thing left to do is to test and see if our upload -> reactor -> app chain is functioning.

Now that we’ve uploaded lets see if our actor was triggered:

t.actors.list_executions(actor_id='$ACTOR_ID')

The response should look like:

_links:
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
actor_id: 8qB5RA33VMlN6
api_server: https://tacc.tapis.io/
executions: [
finish_time: Wed, 12 Jul 2023 20:45:54 GMT
id: jeOJBJyObDWo
message_received_time: Wed, 12 Jul 2023 20:45:38 GMT
start_time: Wed, 12 Jul 2023 20:45:53 GMT
status: COMPLETE]
owner: urrutia
total_cpu: 88536648
total_executions: 1
total_io: 180
total_runtime: 1

If you want to see the logs from your actor execution you can run with one of the execution ids from the response above:

t.actors.get_execution_logs(actor_id='$ACTOR_ID', execution_id='$EXECUTION_ID')

For the first execution the logs should read:

_links:
execution: https://tacc.tapis.io//v3/actors/8qB5RA33VMlN6/executions/8zPjejRxNX3W1
owner: https://tacc.tapis.io//v3/oauth2/profiles/urrutia
logs: work2/05369/urrutia/frontera/uploads/input.fastq
Submitted Job: ea15ef53-cfe9-4191-9672-38cf5f7af970-007
updated submitted.txt

We can inspect or jobs record with:

t.jobs.getJob(jobUuid='$JOB_UUID')

And finally let’s check our products folder to see if we have the output we expect:

ls $WORK/products

Congratulations, you successfully automated part of your workflow with Tapis! But there is no reason to stop here, you can add a notification to your FastQC jobs to trigger a new reactor (and perform an alignment maybe?), and build an entirely automated workflow by chaining together reactors and apps.

Once you’re finished please remove the cron or delete your actor:

# update to remove cron
t.actors.update_actor(actor_id='$ACTOR_ID', image="$DOCKER_USERNAME/$REPO:$TAG", cron=False)
# Delete
t.actors.delete_actor(actor_id='$ACTOR_ID')