Problem with testing bulk inserts with Task.async_stream

Hey @ug0 this is because of how the ecto sandbox works during tests. Each test process is provided its own database connection. The Task.async_stream call spawns new processes, and ecto doesn’t know which connection to use. You can solve this by giving Ecto a little more information:

stream = build_attrs_stream()

parent = self()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Task.async_stream(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch, caller: parent)
    n
  end, ordered: false)
  |> Enum.reduce(0, fn {:ok, n}, acc -> acc + n end)
end)

The change here is the addition of caller: parent as options to the Repo call.