Snowpark for Pythonを使用してストアドプロシージャーを作成することで、Pythonのライブラリを使った柔軟な集計・分析用処理を実現することができます。
例えば、SQLでは難しいPandasを使ったデータ加工・集計、sklearnを使った回帰分析モデルの構築や統計量の計算なども可能となります。
そのためSnowflakeを使う分析者としては、Snowpark for Pythonをある程度使いこなす必要があると思います。
ただ、Pythonは、SQLやSASに比べると文法が難しく、使い方やお作法を間違えると、正しい集計ができません。この記事では、SnowflakeでSnowpark for PythonやPandas使う際に分析者として知っておきくべき基本知識や注意点等を説明します。
なお本家のSnowparkのマニュアルはこちらにありますので適宜参照ください。
SnowflakeテーブルデータからPandas DataFrameの作成方法
まずは、Snowflake上のデータをPandas DataFrameに格納する方法を説明します。
SnowflakeからPandas DataFrameを作成するのは、sessionオプジェクトのtable()関数を利用して、SnowflakeのDataFrameオブジェクトを作成し、SnowflakeのDataFrameオブジェクトのto_pandas()を呼び出す必要があります。
Snowflake WEBUIで使うコードは以下になります。
CREATE OR REPLACE PROCEDURE test_sp(
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'test_sp_h'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.functions import col
import io,sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def test_sp_h(session):
#SessionオブジェクトからSnowflakeのDataframeオブジェクトを作成
snow_table=session.table(["MyDB","MyCategory","Mytable"])
#Pandasに変換
df=snow_table.to_pandas()
#変数を指定したPandas変換
df=df_snow_table.select(col(kmk),col(status)).to_pandas()
$$
上記コードの”MyDB”,”MyCategory”,”Mytable”の部分を実際に存在するテーブル名称に変えていただくと、そのテーブルがPandas DataFrameに読み込まれます。
Pandasの基本的な使い方
データの抽出(Query句)
Pandasでデータの抽出・フィルタリングを行うには、データフレームの列に条件を設定する方法とQuery関数を使う方法の2種類あります。
p_w=100
#例1列に条件設定
df_w1=df[df["s"]<=p_w]
#例2 Query関数を使う方法
df_w1=df.query('s <= @p_w',engine='python')
例1と例2は、同じ結果になります。SQLやSASを利用している方であれば、例2のほうが理解しやすいですが、処理速度は例1が早いので例1を使うことをお勧めします。手元のサンプルでは処理時間に5倍近く差が出ていました。
SQL構文に置き換えると以下になります。
create or replace temporary table df_w1 as
select * from df where s < 100;
件数・最大・最小の求め方(agg関数・min・max・count句)
特定の列の件数や最大値、最小値を求める場合はagg関数を利用します。agg関数で複数の集計関数を指定する場合は配列で指定します。
# 例1 集計関数が1つの場合
max=df["s"].agg('max')
# 例2 複数の集計を行う場合。結果はDataFrameで戻される。
z=df["s"].agg(['max','min','count'])
cnt=z['count']
max=z['max']
min=z['min']
# 例3 変数ごとに集計関数を変更する場合
z=df.agg({'Fare':['max','min','count'],'Pclass':['max','count']})
print(z["Fare"]["max"])
一つの変数に一つの集計関数を指定すると戻り値は数字となりますが、変数や集計関数が複数指定すると、戻り値がDataFrame型となります。使い方によって、戻り値が数字またはDataFrameとなるので注意してください。
またこの関数では対象項目の値がNoneの行は集計されないので注意が必要です。countを指定した場合、Noneを除いた件数が計算されます。
SQLの構文に置き換えると以下になります。
//例1
select max(s) into max from df;
//例2
select max(s),min(s),count(s) into cnt,max,min from df;
//例3
select max(Fare),min(Fare),count(Fare),max(Pclass),count(Pclass)
into max1,min1,cnt1,max2,cnt2 from dt;
グループ単位の集計(groupby 句)
グループ毎の集計を行う場合は、groupby句とagg関数を使用します。集計単位をgroupby句で、集計内容をagg関数で指定します。
集計データにNoneの値がある場合、dropna=False オプションを付けないと、None値は集計されません。また、agg()関数では値がNoneの行は集計対象外になるので、countを指定した場合、Noneを除いた件数が計算されます。総件数を求めたい場合は、Noneの行があるかを確認しましょう。
説明だけだと分かりづらいのでコードと実行結果で説明します。
import pandas as pd
# code code_va1 code_val2 の3カラムをもったDataFrameを作成
df=pd.DataFrame({
'code' : ['1','2','3','4','4','1','2','3',None,'2','1','2','3',None],
'code_val1' : [0,0,0,0,0,0,0,0,1,1,0,0,1,0],
'code_val2' : [0,0,1,1,0,0,0,0,1,1,0,0,1,0]
})
#例1CodeにNoneがある場合出力されない
print(df.groupby('code').agg({"code":"count"}))
#例2Noneは出力されるが、件数が0件
print(df.groupby('code',dropna=False).agg({"code":"count"}))
#例3None行も件数も表示される
print(df.groupby('code',dropna=False).agg({"code_val1":"count"}))
例1はNoneが出力されず、例2はNoneは出力されますが件数が0件(正しくは2件)となります。(結果は以下参照)
#例1 CodeにNoneがある場合出力されない
code
code
1 3
2 4
3 3
4 2
#例2 Noneは出力されるが、件数が0件
code
code
1 3
2 4
3 3
4 2
NaN 0
#例3 None行も件数も表示される
code_val1
code
1 3
2 4
3 3
4 2
NaN 2
Code毎の件数を集計したい場合は、例3のようにコードを記載する必要があります。
グループ単位の集計結果を元のDataFrameに紐付け(transform関数)
グループ単位の集計結果を元のDataFrameに紐付ける場合、groupby句とtransform句を使います。集計単位をgroupby句で、集計内容をtransform関数で指定します。
また、agg関数ではcount,maxのように複数の集計内容を指定できましたが、transform関数で指定できる集計内容は1つです。
import pandas as pd
df=pd.DataFrame({
'code' : ['1','2','3','4','4','1','2','3',None,'2','1','2','3',None],
'code_val1' : [0,0,0,0,0,0,0,0,1,1,0,0,1,0],
'code_val2' : [0,0,1,1,0,0,0,0,1,1,0,0,1,0]
})
#例1
df["code_cnt2"]=df.groupby('code', dropna=False)["code_val1"].transform("count")
#例2
df["code_cnt"]=df[["code","code_val1"]].groupby('code', dropna=False).transform("count")
例1、例2の実行結果は同じですが、実行速度は例1が例2より早いです。手元の集計では2倍ぐらい違ったので、例1を使うほうがよいですね!
集計結果のSnowflakeへの保存
集計した結果を保存するには、保存用のPandas DataFrameを作成し、sessionオブジェクトのwrite_pandas関数を呼び出します。保存用のPandas DataFrameは、カラム名とカラムの型を設定する必要があります。
CREATE OR REPLACE PROCEDURE test_sp(
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'test_sp_h'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.functions import col
import io,sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def test_sp_h(session):
kekka=[[1,2,3],[4,5,6],[7,8,9]]
#保存するデータのDataFrameの作成、カラム名称と型を指定
kekka_df = pd.DataFrame(data=kekka,columns=['col1', 'col2', 'col3'] )
kekka_df=kekka_df.astype({'col1':float,'col2':float,'col3':float})
#Snowflakeのテーブルに保存
session.write_pandas(kekka_df, 'TableName', auto_create_table=True,overwrite=True)
$$
上記コードの’TableName’を変えてお使いください。
なお、サンプルコードは実行の度に以前のデータを削除しますが、overwrite=Trueのオプションを削除してwrite_pandas関数を呼び出すと、レコードが削除されず追記されていきます。
また、一時テーブルへの保存は今のバージョンではできず、永続テーブルでの保存となります。
コメント