About this blueprint
S3 Ingest AWS SQL
This flow will:
- Query data from a Druid server
- Convert the result to a CSV file
- Upload the CSV file to S3
- Create a table in Redshift if it does not exist
- Insert data from the CSV file into the Redshift table.
To set up Apache Druid locally, follow the instructions mentioned in the official documentation. The example Druid dataset wikipedia
has been used in this flow.
yaml
id: druid_to_redshift
namespace: company.team
tasks:
- id: query_druid
type: io.kestra.plugin.jdbc.druid.Query
url: jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true
sql: |
SELECT __time as edit_time, channel, page, user, delta, added, deleted
FROM wikipedia
store: true
- id: write_to_csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{ outputs.query_druid.uri }}"
- id: upload
type: io.kestra.plugin.aws.s3.Upload
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: eu-central-1
from: "{{ outputs.write_to_csv.uri }}"
bucket: <bucket-name>
key: wikipedia/input/wikipedia.csv
- id: create_table
type: io.kestra.plugin.jdbc.redshift.Query
url: jdbc:redshift://123456789.eu-central-1.redshift.amazonaws.com:5439/dev
username: "{{ secret('REDSHIFT_USER') }}"
password: "{{ secret('REDSHIFT_PASSWORD') }}"
sql: |
create table if not exists wikipedia
(
edit_time varchar(100),
channel varchar(100),
page varchar(1000),
wiki_user varchar(100),
edit_delta integer,
added integer,
deleted integer
);
- id: insert_into_redshift
type: io.kestra.plugin.jdbc.redshift.Query
url: jdbc:redshift://123456789.eu-central-1.redshift.amazonaws.com:5439/dev
username: "{{ secret('REDSHIFT_USER') }}"
password: "{{ secret('REDSHIFT_PASSWORD') }}"
sql: |
COPY wikipedia
FROM 's3://<bucket-name>/wikipedia/input/wikipedia.csv'
credentials
'aws_access_key_id=<access-key>;aws_secret_access_key=<secret-key>'
IGNOREHEADER 1
CSV;
More Related Blueprints