Skip to content

Commit 20d3ed1

Browse files
add pandas api on spark
1 parent 44bbf71 commit 20d3ed1

File tree

3 files changed

+280
-1
lines changed

3 files changed

+280
-1
lines changed

.github/workflows/publish-marimo.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ jobs:
3131
run: |
3232
uv run marimo export html data_science_tools/polars_vs_pandas.py -o build/data_science_tools/polars_vs_pandas.html --sandbox
3333
uv run marimo export html llm/pydantic_ai_examples.py -o build/llm/pydantic_ai_examples.html --sandbox
34+
uv run marimo export html data_science_tools/pandas_api_on_spark.py -o build/data_science_tools/pandas_api_on_spark.html --sandbox
3435
3536
- name: Upload Pages Artifact
3637
uses: actions/upload-pages-artifact@v3

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,6 @@ dmypy.json
146146
#hydra
147147
outputs
148148
marimo_notebooks
149-
*.csv
149+
*.csv
150+
*.parquet
151+
*.html
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
# /// script
2+
# requires-python = ">=3.11"
3+
# dependencies = [
4+
# "marimo",
5+
# "matplotlib==3.10.1",
6+
# "numpy==1.26.4",
7+
# "pandas==2.2.3",
8+
# "plotly==6.0.1",
9+
# "pyarrow==20.0.0",
10+
# "pyspark==3.5.5",
11+
# "scikit-learn==1.6.1",
12+
# ]
13+
# ///
14+
15+
import marimo
16+
17+
__generated_with = "0.13.0"
18+
app = marimo.App(width="medium")
19+
20+
21+
@app.cell(hide_code=True)
22+
def _(mo):
23+
mo.md(r"""## Motivation""")
24+
return
25+
26+
27+
@app.cell(hide_code=True)
28+
def _():
29+
import warnings
30+
31+
warnings.filterwarnings("ignore", category=FutureWarning)
32+
return
33+
34+
35+
@app.cell
36+
def _():
37+
import pandas as pd
38+
39+
pandas_df = pd.DataFrame({"value": [1, 2, 3, 4, 5]})
40+
print(pandas_df["value"].mean())
41+
return pandas_df, pd
42+
43+
44+
@app.cell
45+
def _():
46+
from pyspark.sql import SparkSession
47+
from pyspark.sql.functions import avg
48+
49+
spark = SparkSession.builder.getOrCreate()
50+
51+
spark_df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["value"])
52+
spark_df.select(avg("value")).show()
53+
return (spark,)
54+
55+
56+
@app.cell(hide_code=True)
57+
def _(mo):
58+
mo.md(r"""## Basic Operations""")
59+
return
60+
61+
62+
@app.cell
63+
def _():
64+
import pyspark.pandas as ps
65+
66+
ps_s = ps.Series([1, 3, 5, 6, 8])
67+
return (ps,)
68+
69+
70+
@app.cell
71+
def _(pd):
72+
import numpy as np
73+
74+
ps_df = pd.DataFrame(
75+
{"id": np.arange(1, 1_000_001), "value": np.random.randn(1_000_000)}
76+
)
77+
return (ps_df,)
78+
79+
80+
@app.cell
81+
def _(pandas_df, ps):
82+
ps_df_from_pandas = ps.from_pandas(pandas_df)
83+
return
84+
85+
86+
@app.cell(hide_code=True)
87+
def _(mo):
88+
mo.md(r"""## Basic Operations""")
89+
return
90+
91+
92+
@app.cell
93+
def _(ps_df):
94+
ps_df.describe()
95+
return
96+
97+
98+
@app.cell
99+
def _(ps_df):
100+
# Display the summary of the DataFrame
101+
ps_df.info()
102+
103+
return
104+
105+
106+
@app.cell
107+
def _(ps_df):
108+
ps_df.head()
109+
return
110+
111+
112+
@app.cell
113+
def _(ps_df):
114+
# Filter rows and drop any NaN values
115+
filtered_df = ps_df.where(ps_df.value > 0).dropna()
116+
filtered_df.head()
117+
118+
return
119+
120+
121+
@app.cell(hide_code=True)
122+
def _(mo):
123+
mo.md(r"""## GroupBy""")
124+
return
125+
126+
127+
@app.cell
128+
def _(ps):
129+
ps_df_2 = ps.DataFrame(
130+
{"category": ["A", "B", "A", "C", "B"], "value": [10, 20, 15, 30, 25]}
131+
)
132+
return (ps_df_2,)
133+
134+
135+
@app.cell
136+
def _(ps_df_2):
137+
ps_df_2.groupby("category").value.mean()
138+
return
139+
140+
141+
@app.cell(hide_code=True)
142+
def _(mo):
143+
mo.md(r"""## Plotting""")
144+
return
145+
146+
147+
@app.cell
148+
def _(ps_df):
149+
ps_df["value"].plot.hist()
150+
return
151+
152+
153+
@app.cell
154+
def _(ps_df_2):
155+
ps_df_2.plot.bar(x="category", y="value")
156+
return
157+
158+
159+
@app.cell(hide_code=True)
160+
def _(mo):
161+
mo.md(r"""## Reading And Writing Data""")
162+
return
163+
164+
165+
@app.cell
166+
def _(ps, ps_df):
167+
ps_df.to_csv("output_data.csv", index=False)
168+
ps.read_csv("output_data.csv").head()
169+
return
170+
171+
172+
@app.cell
173+
def _(ps, ps_df):
174+
ps_df.to_parquet("output_data.parquet")
175+
ps.read_parquet("output_data.parquet").head()
176+
return
177+
178+
179+
@app.cell(hide_code=True)
180+
def _(mo):
181+
mo.md(r"""## Using Pandas API on Spark in Conjunction with Regular Pandas""")
182+
return
183+
184+
185+
@app.cell
186+
def _(ps):
187+
from sklearn.linear_model import LinearRegression
188+
189+
# Create a large Pandas API on Spark DataFrame
190+
large_pdf_df = ps.DataFrame(
191+
{
192+
"feature1": range(1_000_000),
193+
"feature2": range(1_000_000, 2_000_000),
194+
"target": range(500_000, 1_500_000),
195+
}
196+
)
197+
print(f"Length of the original DataFrame: {len(large_pdf_df):,}")
198+
199+
# Aggregate the data to a smaller size
200+
aggregated = large_pdf_df.groupby(large_pdf_df.feature1 // 10000).mean()
201+
print(f"Length of the aggregated DataFrame: {len(aggregated):,}")
202+
203+
# Convert to pandas DataFrame
204+
small_pdf = aggregated.to_pandas()
205+
206+
# Train a scikit-learn model
207+
model = LinearRegression()
208+
X = small_pdf[["feature1", "feature2"]]
209+
y = small_pdf["target"]
210+
model.fit(X, y)
211+
212+
return
213+
214+
215+
@app.cell(hide_code=True)
216+
def _(mo):
217+
mo.md(r"""## Pandas API on Spark Query Execution Model""")
218+
return
219+
220+
221+
@app.cell
222+
def _(pandas_df):
223+
pandas_df["value"] = pandas_df["value"] + 1 # Operation executes immediately
224+
print(pandas_df)
225+
return
226+
227+
228+
@app.cell
229+
def _(ps_df):
230+
# Using Pandas API on Spark
231+
updated_psdf = ps_df.assign(a=ps_df["value"] + 1) # Lazy operation
232+
print(updated_psdf.head()) # Triggers actual computation
233+
return
234+
235+
236+
@app.cell(hide_code=True)
237+
def _(mo):
238+
mo.md(r"""## Pandas API on Spark vs PySpark""")
239+
return
240+
241+
242+
@app.cell
243+
def _(spark):
244+
from pyspark.sql.functions import col
245+
246+
pyspark_df = spark.createDataFrame([(1, 4), (2, 5), (3, 6)], ["col1", "col2"])
247+
pyspark_df.select((col("col1") + col("col2")).alias("sum")).show()
248+
return (col,)
249+
250+
251+
@app.cell
252+
def _(ps):
253+
pandas_spark_df = ps.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
254+
(pandas_spark_df["col1"] + pandas_spark_df["col2"]).head()
255+
return (pandas_spark_df,)
256+
257+
258+
@app.cell
259+
def _(col, pandas_spark_df):
260+
# Convert Pandas API on Spark DataFrame to PySpark DataFrame
261+
spark_native_df = pandas_spark_df.to_spark()
262+
263+
# Now you can use full PySpark functionality
264+
spark_native_df.select((col("col1") + col("col2")).alias("sum")).show()
265+
return
266+
267+
268+
@app.cell
269+
def _():
270+
import marimo as mo
271+
272+
return (mo,)
273+
274+
275+
if __name__ == "__main__":
276+
app.run()

0 commit comments

Comments
 (0)