Data processing in Parallel (Hackathon project)

If you don’t want to store future and current tasks state you can just get away with NimbleCSV and Task.async_stream/3, very easily:

defmodule IngestCSV do
  alias NimbleCSV.RFC4180, as: YourCSV

  def load(path) do
    path
    |> Path.expand()
    |> File.stream!(read_ahead: 524_288)
    |> YourCSV.parse_stream()
    |> Task.async_stream(fn [name, age, address, email] ->
      # do something with a single CSV record.
      # possibly best to also hand off the results to another service?
      # if you want to just process all records and receive a new list
      # then you should just use Stream.map and Enum.to_list at the end.
    end, max_concurrency: 100, on_timeout: :kill_task, ordered: false)
    |> Stream.run()
  end
end

Flow and Broadway are awesome but for a 2GB file the above will serve you just fine. I’ve processed files up until 17GB or so, if memory serves.

Again though, if each record – or a batch of records – takes more time to process then you’ll need to have more persistent workers where Oban will be much more suited.