Skip to content

Commit a8a5ab8

Browse files
committed
added window functions
1 parent 6514702 commit a8a5ab8

8 files changed

+962
-4
lines changed

Diff for: README.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ Disclaimer: The term vectorization is also used for talking about using SIMD bas
2020
- [vectorization05a.ipynb](vectorization05a.ipynb): generating SQL and dataframe transformation code with one syntax
2121
- [vectorization05b.ipynb](vectorization05b.ipynb): generating SQL and dataframe transformation code with one syntax - an example pipeline
2222
- [vectorization06.ipynb](vectorization06.ipynb): many ways to describe data transformations in python
23+
- [vectorization07.ipynb](vectorization07.ipynb): aggregation functions
24+
- [vectorization08.ipynb](vectorization08.ipynb): window functions

Diff for: table_of_contents.ipynb

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
"- [vectorization04.ipynb](vectorization04.ipynb): defining a data pipeline\n",
2525
"- [vectorization05a.ipynb](vectorization05a.ipynb): generating SQL and dataframe transformation code with one syntax\n",
2626
"- [vectorization05b.ipynb](vectorization05b.ipynb): generating SQL and dataframe transformation code with one syntax - an example pipeline\n",
27-
"- [vectorization06.ipynb](vectorization06.ipynb): many ways to describe data transformations in python\n"
27+
"- [vectorization06.ipynb](vectorization06.ipynb): many ways to describe data transformations in python\n",
28+
"- [vectorization07.ipynb](vectorization07.ipynb): aggregation functions\n",
29+
"- [vectorization08.ipynb](vectorization08.ipynb): window functions"
2830
],
2931
"metadata": {
3032
"collapsed": false

Diff for: vectorization06.ipynb

+9
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,15 @@
573573
"metadata": {
574574
"collapsed": false
575575
}
576+
},
577+
{
578+
"cell_type": "markdown",
579+
"source": [
580+
"Next: [vectorization07.ipynb](vectorization07.ipynb): aggregation functions"
581+
],
582+
"metadata": {
583+
"collapsed": false
584+
}
576585
}
577586
],
578587
"metadata": {

Diff for: vectorization06.py

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
left_join,
1111
mutate,
1212
select, alias,
13+
build_query,
1314
)
1415
from pydiverse.transform.eager import PandasTableImpl
1516
from pydiverse.transform.lazy import SQLTableImpl

Diff for: vectorization07.ipynb

+10-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"from pydiverse.pipedag import Flow, Stage, Table, materialize\n",
3131
"from pydiverse.transform import λ\n",
3232
"from pydiverse.transform.core.verbs import (\n",
33-
" mutate, select, alias, group_by, summarise, arrange,\n",
33+
" mutate, alias, group_by, summarise, arrange, build_query,\n",
3434
")\n",
3535
"from pydiverse.transform.eager import PandasTableImpl\n",
3636
"from pydiverse.transform.lazy import SQLTableImpl\n",
@@ -374,6 +374,15 @@
374374
"start_time": "2023-08-16T17:00:52.328441425Z"
375375
}
376376
}
377+
},
378+
{
379+
"cell_type": "markdown",
380+
"source": [
381+
"Next: [vectorization08.ipynb](vectorization08.ipynb): window functions"
382+
],
383+
"metadata": {
384+
"collapsed": false
385+
}
377386
}
378387
],
379388
"metadata": {

Diff for: vectorization07.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from pydiverse.pipedag import Flow, Stage, Table, materialize
88
from pydiverse.transform import λ
99
from pydiverse.transform.core.verbs import (
10-
mutate, select, alias, group_by, summarise, arrange,
10+
mutate, alias, group_by, summarise, arrange, build_query,
1111
)
1212
from pydiverse.transform.eager import PandasTableImpl
1313
from pydiverse.transform.lazy import SQLTableImpl
@@ -132,4 +132,3 @@ def get_pipeline():
132132
flow = get_pipeline()
133133
result = flow.run()
134134
assert result.successful
135-
flow.run(flow["t2_transformed_data"]["task_transform_sql"])

Diff for: vectorization08.ipynb

+800
Large diffs are not rendered by default.

Diff for: vectorization08.py

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import pandas as pd
2+
import polars as pl
3+
import sqlalchemy as sa
4+
import ibis
5+
from ibis import _ as col
6+
import pydiverse.transform as pdt
7+
from pydiverse.pipedag import Flow, Stage, Table, materialize
8+
from pydiverse.transform import λ
9+
from pydiverse.transform.core.functions import row_number
10+
from pydiverse.transform.core.verbs import (
11+
mutate, alias, arrange, build_query,
12+
)
13+
from pydiverse.transform.eager import PandasTableImpl
14+
from pydiverse.transform.lazy import SQLTableImpl
15+
16+
17+
@materialize(version="1.0.0")
18+
def read_input_data():
19+
titanic = pd.read_csv(
20+
'https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv'
21+
)
22+
return Table(titanic, name="titanic")
23+
24+
25+
@materialize(input_type=pd.DataFrame, version="win1.0.0")
26+
def task_pandas(titanic: pd.DataFrame):
27+
titanic.sort_values("fare", inplace=True)
28+
return (
29+
titanic
30+
.assign(idx=range(len(titanic)), diff_price=titanic.fare.diff())
31+
)
32+
33+
34+
@materialize(input_type=pl.DataFrame, version="win1.0.0")
35+
def task_polars(titanic: pl.DataFrame):
36+
return (
37+
titanic.sort("fare")
38+
.with_columns(idx=pl.Series(range(len(titanic))), diff_price=pl.col("fare").diff())
39+
)
40+
41+
42+
@materialize(input_type=PandasTableImpl, version="win1.0.1")
43+
def task_transform_df(titanic: pdt.Table):
44+
return (
45+
titanic
46+
>> mutate(
47+
idx=row_number(arrange=[λ.fare]),
48+
diff_price = λ.fare.shift(-1, arrange=[λ.fare])-λ.fare
49+
)
50+
>> arrange(λ.fare)
51+
>> alias("transform_df")
52+
)
53+
54+
55+
@materialize(input_type=SQLTableImpl, lazy=True)
56+
def task_transform_sql(titanic: pdt.Table):
57+
return (
58+
titanic
59+
>> mutate(
60+
idx=row_number(arrange=[λ.fare]),
61+
diff_price = λ.fare.shift(-1, arrange=[λ.fare])-λ.fare
62+
)
63+
>> arrange(λ.fare)
64+
>> alias("transform_sql")
65+
)
66+
67+
68+
@materialize(input_type=ibis.api.Table, lazy=True)
69+
def task_ibis(titanic: ibis.api.Table):
70+
w = ibis.window(order_by=col.fare)
71+
return (
72+
titanic
73+
.mutate(idx=ibis.row_number().over(w), diff_price=col.fare.lag(-1).over(w) - col.fare)
74+
.order_by(col.fare)
75+
)
76+
77+
78+
@materialize(input_type=sa.Table, lazy=True)
79+
def task_sqlalchemy(titanic: sa.Table):
80+
return sa.select(
81+
titanic,
82+
sa.func.row_number().over(order_by=titanic.c.fare).label("idx"),
83+
(sa.func.lead(titanic.c.fare, 1, None).over(order_by=titanic.c.fare)
84+
- titanic.c.fare).label("diff_price"),
85+
).select_from(titanic).order_by(titanic.c.fare)
86+
87+
88+
@materialize(input_type=sa.Table, lazy=True)
89+
def task_sql(titanic: sa.Table):
90+
return sa.text(f"""
91+
SELECT tt.*,
92+
ROW_NUMBER() OVER (ORDER BY tt.fare ASC NULLS LAST) AS idx,
93+
LEAD(tt.fare, 1, NULL) OVER (ORDER BY tt.fare ASC NULLS LAST)
94+
- tt.fare AS diff_price
95+
FROM {titanic.original.schema}.{titanic.name} AS tt
96+
ORDER BY tt.fare
97+
""")
98+
99+
100+
@materialize(input_type=pd.DataFrame)
101+
def print_tables(tbls: list[pd.DataFrame]):
102+
from matplotlib import pyplot as plt
103+
fix, axs = plt.subplots(nrows=4, ncols=2)
104+
for tbl, ax in zip(tbls, axs.flatten()):
105+
tbl.sort_values("idx", inplace=True)
106+
print(f"\n\n{tbl}")
107+
ax.plot(tbl.idx, tbl.diff_price.fillna(0))
108+
# limit y axis to 30
109+
ax.set_ylim(0, 30)
110+
plt.show()
111+
112+
113+
def get_pipeline():
114+
tasks = [task_pandas, task_polars, task_transform_df, task_transform_sql,
115+
task_ibis, task_sqlalchemy, task_sql]
116+
with Flow("flow") as flow:
117+
with Stage("t1_raw_input"):
118+
titanic = read_input_data()
119+
120+
with Stage("t2_transformed_data"):
121+
out_tbls = [task(titanic) for task in tasks]
122+
print_tables(out_tbls)
123+
124+
return flow
125+
126+
127+
if __name__ == "__main__":
128+
import logging
129+
from pydiverse.pipedag.util.structlog import setup_logging
130+
131+
setup_logging(log_level=logging.INFO)
132+
133+
flow = get_pipeline()
134+
result = flow.run()
135+
assert result.successful
136+
flow.run(flow["t2_transformed_data"]["task_transform_sql"])

0 commit comments

Comments
 (0)