Blueprints

Run a Python script on AWS ECS Fargate with AWS Batch

About this blueprint

AWS Python Task Runner

This flow will execute a Python script on AWS ECS Fargate. This requires you to setup AWS Batch, S3 Bucket in the same region that you want to run AWS Batch Jobs and AWS credentials.

In order to support inputFiles, namespaceFiles, and outputFiles, the AWS Batch task runner currently relies on multi-containers ECS jobs and creates three containers for each job:

  • A before-container that uploads input files to S3.
  • The main container that fetches input files into the {{ workingDir }} directory and runs the task.
  • An after-container that fetches output files using outputFiles to make them available from the Kestra UI for download and preview.

Since we don't know the working directory of the container in advance, we always need to explicitly define the working directory and output directory when using the AWS Batch runner, e.g. use cat {{ workingDir }}/myFile.txt rather than cat myFile.txt.

yaml
id: aws_batch_runner
namespace: company.team
variables:
  compute_environment_arn: arn:aws:batch:us-east-1:123456789:compute-environment/kestra
  job_queue_arn: arn:aws:batch:us-east-1:123456789:job-queue/kestra
  execution_role_arn: arn:aws:iam::123456789:role/ecsTaskExecutionRole
  task_role_arn: arn:aws:iam::123456789:role/ecsTaskRole
tasks:
  - id: send_data
    type: io.kestra.plugin.scripts.python.Script
    containerImage: ghcr.io/kestra-io/pydata:latest
    taskRunner:
      type: io.kestra.plugin.ee.aws.runner.Batch
      region: us-east-1
      accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
      secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
      computeEnvironmentArn: "{{ vars.compute_environment_arn }}"
      jobQueueArn: "{{ vars.job_queue_arn}}"
      executionRoleArn: "{{ vars.execution_role_arn }}"
      taskRoleArn: "{{ vars.task_role_arn }}"
      bucket: kestra-us
    script: |
      import platform
      import socket
      import sys

      print("Hello from AWS Batch and kestra!")

      def print_environment_info():
          print(f"Host's network name: {platform.node()}")
          print(f"Python version: {platform.python_version()}")
          print(f"Platform information (instance type): {platform.platform()}")
          print(f"OS/Arch: {sys.platform}/{platform.machine()}")

          try:
              hostname = socket.gethostname()
              ip_address = socket.gethostbyname(hostname)
              print(f"Host IP Address: {ip_address}")
          except socket.error as e:
              print("Unable to obtain IP address.")


      if __name__ == '__main__':
          print_environment_info()

Script

Batch

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra