Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,12 @@ def convert_timestamp(value: Any) -> Any:
ser.dt.to_pytimedelta(), index=ser.index, dtype="object", name=ser.name
)

# Handle the 0-column case separately to preserve row count
if len(pdf.columns) == 0:
from pyspark.sql import Row

return [Row()] * len(pdf)

# Convert pandas.DataFrame to list of numpy records
np_records = pdf.set_axis(
[f"col_{i}" for i in range(len(pdf.columns))], axis="columns"
Expand Down Expand Up @@ -998,16 +1004,21 @@ def _create_from_pandas_with_arrow(
step = step if step > 0 else len(pdf)
pdf_slices = (pdf.iloc[start : start + step] for start in range(0, len(pdf), step))

# Create Arrow batches directly using the standalone function
arrow_batches = [
create_arrow_batch_from_pandas(
[(c, t) for (_, c), t in zip(pdf_slice.items(), spark_types)],
timezone=timezone,
safecheck=safecheck,
prefers_large_types=prefers_large_var_types,
)
for pdf_slice in pdf_slices
]
# Handle the 0-column case separately to preserve row count.
# pa.RecordBatch.from_pandas preserves num_rows via pandas index metadata.
if len(pdf.columns) == 0:
arrow_batches = [pa.RecordBatch.from_pandas(pdf_slice) for pdf_slice in pdf_slices]
else:
# Create Arrow batches directly using the standalone function
arrow_batches = [
create_arrow_batch_from_pandas(
[(c, t) for (_, c), t in zip(pdf_slice.items(), spark_types)],
timezone=timezone,
safecheck=safecheck,
prefers_large_types=prefers_large_var_types,
)
for pdf_slice in pdf_slices
]

jsparkSession = self._jsparkSession

Expand Down Expand Up @@ -1074,14 +1085,16 @@ def _create_from_arrow_table(
if not isinstance(schema, StructType):
schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz)

table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast(
to_arrow_schema(
schema,
error_on_duplicated_field_names_in_struct=True,
timezone="UTC",
prefers_large_types=prefers_large_var_types,
# Skip cast for 0-column tables as it loses row count
if len(schema.fields) > 0:
table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast(
to_arrow_schema(
schema,
error_on_duplicated_field_names_in_struct=True,
timezone="UTC",
prefers_large_types=prefers_large_var_types,
)
)
)

# Chunk the Arrow Table into RecordBatches
chunk_size = arrow_batch_size
Expand Down
32 changes: 32 additions & 0 deletions python/pyspark/sql/tests/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,38 @@ def test_empty_schema(self):
sdf = self.spark.createDataFrame(data, schema)
assertDataFrameEqual(sdf, data)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
pandas_requirement_message or pyarrow_requirement_message,
)
def test_from_pandas_dataframe_with_zero_columns(self):
"""SPARK-55600: Test that row count is preserved when creating DataFrame from
pandas with 0 columns but with explicit schema in classic Spark."""
import pandas as pd

# Create a pandas DataFrame with 5 rows but 0 columns
pdf = pd.DataFrame(index=range(5))
schema = StructType([])

# Test with Arrow optimization enabled
with self.sql_conf(
{
"spark.sql.execution.arrow.pyspark.enabled": True,
"spark.sql.execution.arrow.pyspark.fallback.enabled": False,
}
):
df = self.spark.createDataFrame(pdf, schema=schema)
self.assertEqual(df.schema, schema)
self.assertEqual(df.count(), 5)
self.assertEqual(len(df.collect()), 5)

# Test with Arrow optimization disabled
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
df = self.spark.createDataFrame(pdf, schema=schema)
self.assertEqual(df.schema, schema)
self.assertEqual(df.count(), 5)
self.assertEqual(len(df.collect()), 5)


class DataFrameCreationTests(
DataFrameCreationTestsMixin,
Expand Down