Blueprints
Extract data, transform it, and load it in parallel to S3 and Postgres — in less than 7 seconds!
About this blueprint
S3 Parallel Ingest Postgres Outputs
EXTRACT: This workflow downloads data from an external API via HTTP GET request and stores the result in Kestra's internal storage in ION format.
TRANSFORM: The extracted file is then serialized to JSON and transformed row-by-row to add a new column.
LOAD: The final result is ingested in parallel to S3 and Postgres.
yaml
id: api_json_to_postgres
namespace: company.team
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: https://gorest.co.in/public/v2/users
- id: ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.download.uri }}"
newLine: false
- id: json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.ion.uri }}"
- id: add_column
type: io.kestra.plugin.scripts.jython.FileTransform
from: "{{ outputs.json.uri }}"
script: |
from datetime import datetime
logger.info('row: {}', row)
row['inserted_at'] = datetime.utcnow()
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: postgres
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: final_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.add_column.uri }}"
header: true
- id: create_table
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://host.docker.internal:5432/
username: postgres
password: qwerasdfyxcv1234
sql: |
CREATE TABLE IF NOT EXISTS public.raw_users
(
id int,
name VARCHAR,
email VARCHAR,
gender VARCHAR,
status VARCHAR,
inserted_at timestamp
);
- id: load_data
type: io.kestra.plugin.jdbc.postgresql.CopyIn
url: jdbc:postgresql://host.docker.internal:5432/
username: postgres
password: qwerasdfyxcv1234
format: CSV
from: "{{ outputs.final_csv.uri }}"
table: public.raw_users
header: true
- id: s3
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: final_json
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.add_column.uri }}"
- id: json_to_s3
type: io.kestra.plugin.aws.s3.Upload
from: "{{ outputs.final_json.uri }}"
key: users.json
bucket: kestraio
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"