Note
This page was generated from examples/notebooks/dask_df.ipynb.
Working with dask and xarray#
[1]:
from pyhdf.SD import SD
import numpy
import pystare
import xarray
import dask.distributed
import dask.dataframe
import datetime
[2]:
client = dask.distributed.Client(n_workers=4)
[3]:
file_path = '../tests/data/granules/MOD05_L2.A2019336.0000.061.2019336211522.hdf'
hdf = SD(file_path)
lon = hdf.select('Longitude').get().astype(numpy.double)
lat = hdf.select('Latitude').get().astype(numpy.double)
[ ]:
start = datetime.datetime.now()
stare = pystare.from_latlon_2d(lat=lat,
lon=lon,
adapt_level=True)
print(datetime.datetime.now()-start)
Dask#
[5]:
coords = numpy.array([lat, lon])
coords_d = dask.array.from_array(coords, chunks=(2,500,1354))
coords_d
[5]:
|
[6]:
def stare(coords):
return pystare.from_latlon_2d(coords[0],
coords[1], adapt_level=True)
[7]:
s_d = coords_d.map_blocks(stare, drop_axis=[0], chunks=(100, 1354), dtype='int64')
s_d = s_d.compute()
Xarray Ufunc#
[8]:
lat_x = xarray.DataArray(lat, dims=['x', 'y']).chunk({'x': 500})
lon_x = xarray.DataArray(lon, dims=['x', 'y']).chunk({'x': 500})
[9]:
start = datetime.datetime.now()
s_d = xarray.apply_ufunc(pystare.from_latlon_2d,
lat_x,
lon_x,
dask='parallelized',
output_dtypes=[numpy.int64])
sids = numpy.array(s_d)
print(datetime.datetime.now()-start)
0:00:01.039442
[10]:
sids
[10]:
array([[4298473764500464809, 4298458168380511209, 4297394569014717897,
..., 3604325910693007273, 3604468594879342953,
3604495833162833193],
[4298462872969244297, 4298459225563237225, 4297297422977447753,
..., 3604330264741384009, 3604471380516185641,
3604465738696115433],
[4298462873435275369, 4298459227962358473, 4297297429637206121,
..., 3604322952727773225, 3604471381825883401,
3604465733841987657],
...,
[3652144132972193481, 3650323462937407913, 3650325177740030185,
..., 3727730728598789545, 3727841631302055049,
3727831398613792009],
[3652144129926505097, 3650323400334252041, 3650325178786309321,
..., 3727730732960989609, 3727841627078009577,
3727831398032615625],
[3652167198498770729, 3652159322973158121, 3650318911383240361,
..., 3727838256925064969, 3727843063731949801,
3727853163225616425]])
Write Sidecar#
[11]:
import netCDF4
rootgrp = netCDF4.Dataset('test.nc', "w", format="NETCDF4")
rootgrp.close()
Dask DataFrame#
[12]:
band1 = hdf.select('Water_Vapor_Infrared').get().astype(numpy.double)
lat = hdf.select('Latitude').get().astype(numpy.double)
[13]:
import pandas
df = pandas.DataFrame({'stare': sids.flatten(), 'band1': band1.flatten()})
ddf = dask.dataframe.from_pandas(df, npartitions=4)
ddf.set_index('stare')
[13]:
Dask DataFrame Structure:
| band1 | |
|---|---|
| npartitions=4 | |
| 3604081108103418377 | float64 |
| 3618019075911588075 | ... |
| 3661533202196794217 | ... |
| 3736859491218877322 | ... |
| 4298544093115426153 | ... |
Dask Name: sort_index, 7 graph layers
[ ]: