Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Jan 20, 2026

What changes were proposed in this pull request?

Fix a memory leak that when the output stream is stopped before EOS

Currently, memory is freed by processor.close() at EOS, but when python node is followed by a Limit node, the processor.close() is not called at the end of task and cause memory leak.

Why are the changes needed?

import pyarrow as pa

spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")


def get_size(iterator):
    for batch in iterator:
        if batch.num_rows > 0:
            yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], names=['size'])

spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()

fails with

26/01/20 17:45:48 ERROR BaseAllocator: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 0/12/12/9223372036854775807 (res/actual/peak/limit)

26/01/20 17:45:48 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 0/12/12/9223372036854775807 (res/actual/peak/limit)

        at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1(PythonArrowOutput.scala:81)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1$adapted(PythonArrowOutput.scala:77)
        at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:146)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:212)
        at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:146)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:199)
        at org.apache.spark.scheduler.Task.run(Task.scala:147)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
26/01/20 17:45:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 0/12/12/9223372036854775807 (res/actual/peak/limit)

If I remove the limit(1), then

spark.range(10).mapInArrow(get_size, "size long").collect()

works as expected.

Does this PR introduce any user-facing change?

yes, bug-fix

after this fix

In [2]: spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
Out[2]: [Row(size=1)]

How was this patch tested?

added tests

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions
Copy link

JIRA Issue Information

=== Bug SPARK-55098 ===
Summary: Vectorized UDFs with output batch control fail with memory leak
Assignee: None
Status: Open
Affected: ["4.2.0","4.1.1"]


This comment was automatically generated by GitHub Actions

expected = df.collect()
self.assertEqual(actual, expected)

def test_map_in_arrow_with_limit(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this fix, this test fails with

ERROR [0.449s]: test_map_in_arrow_with_limit (__main__.MapInArrowWithOutputArrowBatchSlicingBytesTests.test_map_in_arrow_with_limit)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/ruifeng.zheng/spark/python/pyspark/sql/tests/arrow/test_arrow_map.py", line 66, in test_map_in_arrow_with_limit
    df.mapInArrow(get_size, "size long").limit(1).collect()
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
  File "/Users/ruifeng.zheng/spark/python/pyspark/sql/classic/dataframe.py", line 474, in collect
    sock_info = self._jdf.collectToPython()
  File "/Users/ruifeng.zheng/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__
    return_value = get_return_value(
        answer, self.gateway_client, self.target_id, self.name)
  File "/Users/ruifeng.zheng/spark/python/pyspark/errors/exceptions/captured.py", line 263, in deco
    return f(*a, **kw)
  File "/Users/ruifeng.zheng/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/protocol.py", line 327, in get_return_value
    raise Py4JJavaError(
        "An error occurred while calling {0}{1}{2}.\n".
        format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o1111.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 47) (localhost executor driver): org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python3) 0/12/12/9223372036854775807 (res/actual/peak/limit)

        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:267)
        at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:146)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:199)
        at org.apache.spark.scheduler.Task.run(Task.scala:147)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
        Suppressed: java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python3) 0/12/12/9223372036854775807 (res/actual/peak/limit)

                at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
                at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1(PythonArrowOutput.scala:84)
                at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1$adapted(PythonArrowOutput.scala:77)
                at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:146)
                at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:157)
                at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:157)
                at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:212)
                ... 12 more



context.addTaskCompletionListener[Unit] { _ =>
if (processor != null) {
processor.close()
Copy link
Contributor Author

@zhengruifeng zhengruifeng Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this fix, processor.close() is only called at EOS.

processor.close()
reader.close(false)
allocator.close()
// Reach end of stream. Call `read()` again to read control data.
read()

When the EOS is not reached, allocator.close() here will raise memory leak failure

prevVectors.foreach(_.close())
prevRoot.close()
}
super.close()
Copy link
Contributor Author

@zhengruifeng zhengruifeng Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on my observation, this line is not necessary to fix this issue.
But I feel the subclasses should always call it to free vectors and root

@zhengruifeng zhengruifeng changed the title [SPARK-55098][PYTHON] Vectorized UDFs with output batch control fail with memory leak [SPARK-55098][PYTHON] Vectorized UDFs with output batch size control fail with memory leak Jan 20, 2026
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 20, 2026

Merged to master and branch-4.1.

HyukjinKwon pushed a commit that referenced this pull request Jan 20, 2026
…fail with memory leak

### What changes were proposed in this pull request?
Fix a memory leak that when the output stream is stopped before EOS

Currently, memory is freed by `processor.close()` at EOS, but when python node is followed by a `Limit` node, the `processor.close()` is not called at the end of task and cause memory leak.

### Why are the changes needed?

```
import pyarrow as pa

spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")

def get_size(iterator):
    for batch in iterator:
        if batch.num_rows > 0:
            yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], names=['size'])

spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
```

fails with
```
26/01/20 17:45:48 ERROR BaseAllocator: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 0/12/12/9223372036854775807 (res/actual/peak/limit)

26/01/20 17:45:48 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 0/12/12/9223372036854775807 (res/actual/peak/limit)

        at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1(PythonArrowOutput.scala:81)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1$adapted(PythonArrowOutput.scala:77)
        at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:146)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:212)
        at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:157)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:146)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:199)
        at org.apache.spark.scheduler.Task.run(Task.scala:147)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
26/01/20 17:45:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by query. Memory leaked: (12)
Allocator(stdin reader for /Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 0/12/12/9223372036854775807 (res/actual/peak/limit)
```

If I remove the `limit(1)`, then
```
spark.range(10).mapInArrow(get_size, "size long").collect()
```
works as expected.

### Does this PR introduce _any_ user-facing change?
yes, bug-fix

after this fix
```
In [2]: spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
Out[2]: [Row(size=1)]
```

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #53867 from zhengruifeng/test_output_memory.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@zhengruifeng zhengruifeng deleted the test_output_memory branch January 20, 2026 23:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants