Convert an ION file into Avro.
type: "io.kestra.plugin.serdes.avro.iontoavro"Convert a CSV file to the Avro format.
id: divvy_tripdata
namespace: company.team
variables:
file_id: "{{ execution.startDate | dateAdd(-3, 'MONTHS') | date('yyyyMM') }}"
tasks:
- id: get_zipfile
type: io.kestra.plugin.core.http.Download
uri: "https://divvy-tripdata.s3.amazonaws.com/{{ render(vars.file_id) }}-divvy-tripdata.zip"
- id: unzip
type: io.kestra.plugin.compress.ArchiveDecompress
algorithm: ZIP
from: "{{ outputs.get_zipfile.uri }}"
- id: convert
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ outputs.unzip.files[render(vars.file_id) ~ '-divvy-tripdata.csv'] }}"
- id: to_avro
type: io.kestra.plugin.serdes.avro.IonToAvro
from: "{{ outputs.convert.uri }}"
datetimeFormat: "yyyy-MM-dd' 'HH:mm:ss"
schema: |
{
"type": "record",
"name": "Ride",
"namespace": "com.example.bikeshare",
"fields": [
{"name": "ride_id", "type": "string"},
{"name": "rideable_type", "type": "string"},
{"name": "started_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "ended_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "start_station_name", "type": "string"},
{"name": "start_station_id", "type": "string"},
{"name": "end_station_name", "type": "string"},
{"name": "end_station_id", "type": "string"},
{"name": "start_lat", "type": "double"},
{"name": "start_lng", "type": "double"},
{
"name": "end_lat",
"type": ["null", "double"],
"default": null
},
{
"name": "end_lng",
"type": ["null", "double"],
"default": null
},
{"name": "member_casual", "type": "string"}
]
}Source file URI
Pebble expression referencing an Internal Storage URI e.g. {{ outputs.mytask.uri }}.
yyyy-MM-dd[XXX]Format to use when parsing date
yyyy-MM-dd'T'HH:mm[:ss][.SSSSSS][XXX]Format to use when parsing datetime
Default value is yyyy-MM-dd'T'HH: mm[: ss][.SSSSSS]XXX
.Character to recognize as decimal point (e.g. use ‘,’ for European data).
Default value is '.'
["f","false","disabled","0","off","no",""]Values to consider as False
falseTry to infer all fields
If true, we try to infer all fields using trueValues, falseValues, and nullValues.If false, we infer booleans and nulls only on fields declared in the schema as null or bool.
["","#N/A","#N/A N/A","#NA","-1.#IND","-1.#QNAN","-NaN","1.#IND","1.#QNAN","NA","n/a","nan","null"]Values to consider as null
100Number of rows that will be scanned while inferring. The more rows scanned, the more precise the output schema will be.
Only use when the 'schema' property is empty
ERRORERRORWARNSKIPHow to handle bad records (e.g., null values in non-nullable fields or type mismatches).
Can be one of: FAIL, WARN or SKIP.
The avro schema associated with the data
If empty, the task will try to infer the schema from the current data; use the 'numberOfRowsToScan' property if needed
falseWhether to consider a field present in the data but not declared in the schema as an error
Default value is false
HH:mm[:ss][.SSSSSS][XXX]Format to use when parsing time
Etc/UTCTimezone to use when no timezone can be parsed on the source.
If null, the timezone defaults to UTC. Default value is the system timezone
["t","true","enabled","1","on","yes"]Values to consider as True
uriURI of a temporary result file