Hey @ug0 this is because of how the ecto sandbox works during tests. Each test process is provided its own database connection. The Task.async_stream call spawns new processes, and ecto doesn’t know which connection to use. You can solve this by giving Ecto a little more information:
stream = build_attrs_stream()
parent = self()
Repo.transaction(fn ->
stream
|> Stream.chunk_every(5000)
|> Task.async_stream(fn batch ->
{n, _} = Repo.insert_all(Assignment, batch, caller: parent)
n
end, ordered: false)
|> Enum.reduce(0, fn {:ok, n}, acc -> acc + n end)
end)
The change here is the addition of caller: parent as options to the Repo call.






















