If you don’t want to store future and current tasks state you can just get away with NimbleCSV and Task.async_stream/3, very easily:
defmodule IngestCSV do
alias NimbleCSV.RFC4180, as: YourCSV
def load(path) do
path
|> Path.expand()
|> File.stream!(read_ahead: 524_288)
|> YourCSV.parse_stream()
|> Task.async_stream(fn [name, age, address, email] ->
# do something with a single CSV record.
# possibly best to also hand off the results to another service?
# if you want to just process all records and receive a new list
# then you should just use Stream.map and Enum.to_list at the end.
end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
|> Stream.run()
end
end
Flow and Broadway are awesome but for a 2GB file the above will serve you just fine. I’ve processed files up until 17GB or so, if memory serves.
Again though, if each record – or a batch of records – takes more time to process then you’ll need to have more persistent workers where Oban will be much more suited.






















