In [202]:
import dask
import dask.dataframe as dd
import pandas as pd
import matplotlib.pyplot as plt
import fastparquet
In [203]:
fastparquet.__version__
Out[203]:
'0.4.0'
In [211]:
df = pd.read_pickle("dummy_data2.pkl").drop_duplicates()
In [220]:
df[['ds', 'J']].set_index('ds').plot(figsize=(17,6))
Out[220]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fa9df326ac8>
In [221]:
df["year"] = df["ds"].apply(lambda x: x.timetuple().tm_year)
df["month"] = df["ds"].apply(lambda x: x.timetuple().tm_mon)
df["day"] = df["ds"].apply(lambda x: x.timetuple().tm_mday)
df.head()
Out[221]:
A ds B C D E F G H I J K L year month day
0 27.505333 2020-01-01 03:02:00+00:00 73.566116 73.964256 53.123333 17.191334 16.323334 87.676941 90.279724 0.0 0.0 0.0 0.0 2020 1 1
1 22.346666 2020-01-01 03:07:00+00:00 73.568932 73.968193 51.790001 16.543333 12.056666 87.671593 90.279724 0.0 0.0 0.0 0.0 2020 1 1
2 20.768000 2020-01-01 03:12:00+00:00 73.571846 73.969215 49.723331 17.153334 13.723333 87.984344 90.279724 0.0 0.0 0.0 0.0 2020 1 1
3 20.480000 2020-01-01 03:17:00+00:00 73.574928 73.970695 47.723331 17.007334 13.123333 87.671913 90.279724 0.0 0.0 0.0 0.0 2020 1 1
4 21.702000 2020-01-01 03:22:00+00:00 73.577995 73.978165 50.459332 16.774000 12.523334 87.671913 90.279724 0.0 0.0 0.0 0.0 2020 1 1
In [222]:
#df = df.set_index("ds")
In [223]:
len(df)
Out[223]:
58995
In [224]:
#shuffle
import math
chunk = 10
datashuffle = df.sort_values('ds').to_dict('r')
#random.shuffle(datashuffle)
size = math.ceil(len(datashuffle)/chunk)
dfs = [pd.DataFrame(datashuffle[i*size if i == 0 else (i*size)-3000:size*(i+1)]) for i in range(chunk)]
for i in range(chunk):
    dfs[i].to_pickle(f"p2_{i}.pkl")
In [225]:
[len(i) for i in dfs]
Out[225]:
[5900, 8900, 8900, 8900, 8900, 8900, 8900, 8900, 8900, 8895]
In [236]:
fastparquet.write(
    filename="./data3",
    data=dfs[0],
    compression='GZIP',
    file_scheme='hive',
    #open_with=myopen,
    partition_on=['year', 'month', 'day'],
    write_index=False,
    #mkdirs= lambda x: True # for s3fs
)
In [216]:
dfs[0].sort_values("ds").to_parquet(
    fname="./data3/",
    compression='GZIP',
    #compression='',
    engine='fastparquet',
    #append=True,
    partition_cols=['year', 'month', 'day'],
    index=False
)
/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:8: FutureWarning: the 'fname'' keyword is deprecated, use 'path' instead
  
In [235]:
!rm -rf data3
In [237]:
def plot():
    df2 = pd.read_parquet("./data3", engine='pyarrow').sort_values("ds")
    fullds = pd.date_range(start='1/1/2020', end='1/7/2020')
    fig, ax = plt.subplots()
    fig.set_size_inches(17,6)
    ax.plot(fullds.values, [0]*len(fullds), alpha=0)
    ax.plot(df2["ds"].values, df2["A"].values)
def append(part):
    toadd = pd.read_pickle(f"p2_{part}.pkl")
    filters = [('ds', '>', pd.Timestamp(toadd.iloc[0]['ds'].replace(tzinfo=None))), ('ds', '<', pd.Timestamp(toadd.iloc[-1]['ds'].replace(tzinfo=None)))]
    ddf = dd.read_parquet("./data3", columns=['ds'], filters=filters, index=False)
    dup = ddf['ds'].compute()
    toadd_new = toadd[~toadd["ds"].isin(dup.tolist())]
    print("dup", len(toadd)-len(toadd_new))
    toadd_new.to_parquet(
        fname="./data3/",
        compression='GZIP',
        #compression='',
        engine='fastparquet',
        append=True,
        partition_cols=['year', 'month', 'day'],
        index=False
    )
In [238]:
plot()
In [239]:
append(1)
plot()
dup 3000
In [240]:
append(9)
plot()
dup 0
In [241]:
append(7)
plot()
dup 0
In [242]:
append(5)
plot()
dup 0
In [243]:
append(2)
append(3)
append(4)
append(6)
append(8)
plot()
dup 3000
dup 3000
dup 6000
dup 6000
dup 6000
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: