Note

This page was generated from examples/notebooks/dask_df.ipynb.

Working with dask and xarray#

[1]:
from pyhdf.SD import SD
import numpy
import pystare
import xarray
import dask.distributed
import dask.dataframe
import datetime
[2]:
client = dask.distributed.Client(n_workers=4)
[3]:
file_path = '../tests/data/granules/MOD05_L2.A2019336.0000.061.2019336211522.hdf'
hdf = SD(file_path)
lon = hdf.select('Longitude').get().astype(numpy.double)
lat = hdf.select('Latitude').get().astype(numpy.double)
[ ]:
start = datetime.datetime.now()
stare = pystare.from_latlon_2d(lat=lat,
                              lon=lon,
                              adapt_level=True)
print(datetime.datetime.now()-start)

Dask#

[5]:
coords = numpy.array([lat, lon])
coords_d = dask.array.from_array(coords, chunks=(2,500,1354))
coords_d
[5]:
Array Chunk
Bytes 1.67 MiB 1.67 MiB
Shape (2, 406, 270) (2, 406, 270)
Count 1 Graph Layer 1 Chunks
Type float64 numpy.ndarray
270 406 2
[6]:
def stare(coords):
    return pystare.from_latlon_2d(coords[0],
                                 coords[1], adapt_level=True)
[7]:
s_d = coords_d.map_blocks(stare, drop_axis=[0], chunks=(100, 1354), dtype='int64')
s_d = s_d.compute()

Xarray Ufunc#

[8]:
lat_x = xarray.DataArray(lat, dims=['x', 'y']).chunk({'x': 500})
lon_x = xarray.DataArray(lon, dims=['x', 'y']).chunk({'x': 500})
[9]:
start = datetime.datetime.now()
s_d = xarray.apply_ufunc(pystare.from_latlon_2d,
                         lat_x,
                         lon_x,
                         dask='parallelized',
                         output_dtypes=[numpy.int64])

sids = numpy.array(s_d)
print(datetime.datetime.now()-start)
0:00:01.039442
[10]:
sids
[10]:
array([[4298473764500464809, 4298458168380511209, 4297394569014717897,
        ..., 3604325910693007273, 3604468594879342953,
        3604495833162833193],
       [4298462872969244297, 4298459225563237225, 4297297422977447753,
        ..., 3604330264741384009, 3604471380516185641,
        3604465738696115433],
       [4298462873435275369, 4298459227962358473, 4297297429637206121,
        ..., 3604322952727773225, 3604471381825883401,
        3604465733841987657],
       ...,
       [3652144132972193481, 3650323462937407913, 3650325177740030185,
        ..., 3727730728598789545, 3727841631302055049,
        3727831398613792009],
       [3652144129926505097, 3650323400334252041, 3650325178786309321,
        ..., 3727730732960989609, 3727841627078009577,
        3727831398032615625],
       [3652167198498770729, 3652159322973158121, 3650318911383240361,
        ..., 3727838256925064969, 3727843063731949801,
        3727853163225616425]])

Write Sidecar#

[11]:
import netCDF4
rootgrp = netCDF4.Dataset('test.nc', "w", format="NETCDF4")

rootgrp.close()

Dask DataFrame#

[12]:
band1 = hdf.select('Water_Vapor_Infrared').get().astype(numpy.double)
lat = hdf.select('Latitude').get().astype(numpy.double)
[13]:
import pandas
df = pandas.DataFrame({'stare': sids.flatten(), 'band1': band1.flatten()})
ddf = dask.dataframe.from_pandas(df, npartitions=4)
ddf.set_index('stare')
[13]:
Dask DataFrame Structure:
band1
npartitions=4
3604081108103418377 float64
3618019075911588075 ...
3661533202196794217 ...
3736859491218877322 ...
4298544093115426153 ...
Dask Name: sort_index, 7 graph layers
[ ]: