Using Kerchunk#

In this notebook, we’ll use a Kerchunk index file to speed up the metadata reading for a large collection of NetCDF files. The actual data will still be in the original NetCDF files.

Kerchunk Background#

In the last notebook, we saw that accessing data from the NetCDF file over the network was slow, in part because it was making a bunch of HTTP requests just to read some metadata that’s scattered around the NetCDF file. With a Kerchunk index file, you get to bypass all that seeking around for metadata: it’s already been extracted into the index file. While that’s maybe not a huge deal for a single NetCDF file, it matters a bunch when you’re dealing with thousands of NetCDF files (1,000 files * 1.5 seconds per file = ~25 minutes just to read metadata).

import adlfs
import xarray as xr
import fsspec
import json
import odc.geo
import rioxarray
import requests
import pyproj
import planetary_computer
import pystac_client
import geopandas

# force xarray to import everything
xr.tutorial.open_dataset("air_temperature");
%%time
m = fsspec.get_mapper(
    "reference://",
    fo="abfs://ciroh/short-range-channel_rt-kerchunk/reference.json",
    remote_options={"account_name": "noaanwm"},
    target_options={"account_name": "noaanwm"},
    skip_instance_cache=True,
)

channel_rt = xr.open_dataset(m, engine="zarr", consolidated=False, chunks={"time": 1})
channel_rt
CPU times: user 580 ms, sys: 59.9 ms, total: 640 ms
Wall time: 945 ms
<xarray.Dataset>
Dimensions:         (time: 6866, feature_id: 2776738, reference_time: 1)
Coordinates:
  * feature_id      (feature_id) float64 101.0 179.0 181.0 ... 1.18e+09 1.18e+09
  * reference_time  (reference_time) datetime64[ns] 2022-06-29
  * time            (time) datetime64[ns] 2022-06-29T01:00:00 ... 2023-04-21T...
Data variables:
    crs             (time) object dask.array<chunksize=(1,), meta=np.ndarray>
    nudge           (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray>
    qBtmVertRunoff  (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray>
    qBucket         (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray>
    qSfcLatRunoff   (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray>
    streamflow      (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray>
    velocity        (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray>
Attributes: (12/19)
    Conventions:                CF-1.6
    NWM_version_number:         v2.2
    TITLE:                      OUTPUT FROM NWM v2.2
    cdm_datatype:               Station
    code_version:               v5.2.0-beta2
    dev:                        dev_ prefix indicates development/internal me...
    ...                         ...
    model_output_type:          channel_rt
    model_output_valid_time:    2022-06-29_01:00:00
    model_total_valid_times:    18
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    station_dimension:          feature_id
    stream_order_output:        1

So in about a second, we opened up the Kerchunk index file and used that to build our Dataset. Doing that from the raw NetCDF files would have taken ~hours.

You’ll notice we’re using the zarr engine for xarray. That’s just a convenient way to expose Kerchunk indexed data to anything that can read Zarr (like xarray), without having to write a dedicated “kerchunk” reader. The raw data are still in the HDF5 files pushed to Azure by NODD. This actually exposes a bit about how Kerchunk works. Our “mapper”, m has all of the metadata already in-memory.

Zarr uses a simple dictionary-like interface with well-known keys. You can get the global attributes at .zattrs.

json.loads(m[".zattrs"])  # matches channel_rt.attrs
{'Conventions': 'CF-1.6',
 'NWM_version_number': 'v2.2',
 'TITLE': 'OUTPUT FROM NWM v2.2',
 'cdm_datatype': 'Station',
 'code_version': 'v5.2.0-beta2',
 'dev': 'dev_ prefix indicates development/internal meta data',
 'dev_NOAH_TIMESTEP': 3600,
 'dev_OVRTSWCRT': 1,
 'dev_channelBucket_only': 0,
 'dev_channel_only': 0,
 'featureType': 'timeSeries',
 'model_configuration': 'short_range',
 'model_initialization_time': '2022-06-29_00:00:00',
 'model_output_type': 'channel_rt',
 'model_output_valid_time': '2022-06-29_01:00:00',
 'model_total_valid_times': 18,
 'proj4': '+proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1=30.0 +lat_2=60.0 +lat_0=40.0 +lon_0=-97.0 +x_0=0 +y_0=0 +k_0=1.0 +nadgrids=@',
 'station_dimension': 'feature_id',
 'stream_order_output': 1}

Or the attributes for a specific array:

json.loads(m["streamflow/.zattrs"])  # channel_rt.streamflow.attrs
{'_ARRAY_DIMENSIONS': ['time', 'feature_id'],
 'add_offset': 0.0,
 'coordinates': 'latitude longitude',
 'grid_mapping': 'crs',
 'long_name': 'River Flow',
 'missing_value': -999900,
 'scale_factor': 0.009999999776482582,
 'units': 'm3 s-1',
 'valid_range': [0, 5000000]}

Those attributes saved in the Kerchunk index file, and so don’t require any (additional) HTTP requests to get. But we don’t want to re-save the large data variables, since we don’t want to host the data twice. So what happens when you want to read a data variable? Well, we can look at the references attribute backing this mapping to find out.

m.fs.references["streamflow/0.0"]
['abfs://nwm/nwm.20220629/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc',
 614098,
 1860920]

In a typical Zarr dataset, accessing an array at <name>/0.0 would give you the actual bytes for the data at that location. But recall that with Kerchunk the original data is buried in some NetCDF file. Kerchunk has extracted three pieces of information:

  1. The URL where this chunk of data comes from

  2. The offset within that file

  3. The size of the chunk of data, in bytes, on disk

Thanks to HTTP range requests (the same thing that powers streaming video) we can request just the subset of the file we need. When a high-level library like xarray asks the data in that chunk, this toolchain (of zarr, fsspec’s reference filesystem, and adlfs) will make the HTTP range request in the background and deliver the bytes.

%time df = channel_rt.isel(time=0)[["streamflow", "velocity"]].load().to_dataframe()
CPU times: user 359 ms, sys: 61 ms, total: 420 ms
Wall time: 426 ms
import datashader
import colorcet
import numpy as np

cvs = datashader.Canvas(plot_width=600, plot_height=600)
agg = cvs.points(np.log1p(df[["streamflow", "velocity"]]), "streamflow", "velocity")
img = datashader.tf.shade(agg, cmap=colorcet.fire)
img

One very important caveat: because Kerchunk is just an index on the existing data, we inherit all of the limitations of its chunking structure. This datsaet is chunked by hour along time. If you want all the data for a single timestamp, this is ideal. We’ll return to this later.

If you’re interested in generating the Kerchunk files, see the processing directory. I used the pangeo-forge library to write a “recipe” for generating Kerchunk index files from a bunch of NetCDF files, and stick them together properly. I then executed that job using the same Kubernetes cluster we’re using for this workshop.

Exercise: Have fun!#

I’ve also created Kerchunk indexes for the 0-hour forecasts for land and forcing. These are pretty large datasets, so be careful acessing too much data. The index files alone are ~200MB of JSON.

%%time
fo = requests.get(
    "https://noaanwm.blob.core.windows.net/ciroh/short-range-land-kerchunk/reference.json"
).json()
m = fsspec.get_mapper(
    "reference://",
    fo=fo,
    remote_options={"account_name": "noaanwm"},
    target_options={"account_name": "noaanwm"},
)

land = xr.open_dataset(m, engine="zarr", consolidated=False, chunks={"time": 1})
land
CPU times: user 5.91 s, sys: 690 ms, total: 6.6 s
Wall time: 7.15 s
<xarray.Dataset>
Dimensions:         (time: 6992, y: 3840, x: 4608, reference_time: 1)
Coordinates:
  * reference_time  (reference_time) datetime64[ns] 2022-06-29
  * time            (time) datetime64[ns] 2022-06-29T01:00:00 ... 2023-04-26T...
  * x               (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06
  * y               (y) float64 -1.92e+06 -1.919e+06 ... 1.918e+06 1.919e+06
Data variables:
    ACCET           (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    FSNO            (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    SNEQV           (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    SNOWH           (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    SNOWT_AVG       (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    SOILSAT_TOP     (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    crs             (time) object dask.array<chunksize=(1,), meta=np.ndarray>
Attributes:
    Conventions:                CF-1.6
    GDAL_DataType:              Generic
    NWM_version_number:         v2.2
    TITLE:                      OUTPUT FROM NWM v2.2
    code_version:               v5.2.0-beta2
    model_configuration:        short_range
    model_initialization_time:  2022-06-29_00:00:00
    model_output_type:          land
    model_output_valid_time:    2022-06-29_01:00:00
    model_total_valid_times:    18
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
%%time
fo = requests.get(
    "https://noaanwm.blob.core.windows.net/ciroh/short-range-forcing-kerchunk/reference.json"
).json()
m = fsspec.get_mapper(
    "reference://",
    fo=fo,
    remote_options={"account_name": "noaanwm"},
    target_options={"account_name": "noaanwm"},
)

forcing = xr.open_dataset(m, engine="zarr", consolidated=False, chunks={"time": 1})
forcing
CPU times: user 8.05 s, sys: 1.03 s, total: 9.08 s
Wall time: 9.85 s
<xarray.Dataset>
Dimensions:         (time: 7306, y: 3840, x: 4608, reference_time: 1)
Coordinates:
  * reference_time  (reference_time) datetime64[ns] 2022-06-29
  * time            (time) datetime64[ns] 2022-06-29T01:00:00 ... 2023-04-29T...
  * x               (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06
  * y               (y) float64 -1.92e+06 -1.919e+06 ... 1.918e+06 1.919e+06
Data variables:
    LWDOWN          (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    PSFC            (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    Q2D             (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    RAINRATE        (time, y, x) float32 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    SWDOWN          (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    T2D             (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    U2D             (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    V2D             (time, y, x) float64 dask.array<chunksize=(1, 768, 922), meta=np.ndarray>
    crs             object ...
Attributes:
    NWM_version_number:         v2.2
    model_configuration:        short_range
    model_initialization_time:  2022-06-29_00:00:00
    model_output_type:          forcing
    model_output_valid_time:    2022-06-29_01:00:00
    model_total_valid_times:    18

As an example, let’s load up the state boundary for Iowa from the Planetary Computer’s US Census dataset.

# Get the state boundary for Iowa
catalog = pystac_client.Client.open(
    "http://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)
collection = catalog.get_collection("us-census")

states = collection.get_item("2020-cb_2020_us_state_500k").assets["data"]

df = geopandas.read_parquet(
    states.href,
    storage_options=states.extra_fields["table:storage_options"],
    columns=["STUSPS", "geometry"],
)
ia = df.loc[df.STUSPS == "IA", "geometry"].item()
ia
_images/896a935a0ef3ee93a7ed0529816001aa6e864f2d6d8b4a9521183a01f458469b.svg

We can clip the full forcing data down to that state boundary (as long as we’re careful with reference systems):

import pyproj

crs = pyproj.CRS.from_cf(forcing.crs.attrs)
forcing = forcing.rio.write_crs(crs)

# Note this is still lazy
subset = forcing.T2D.rio.clip([ia], crs="epsg:4326")
subset
<xarray.DataArray 'T2D' (time: 7306, y: 337, x: 519)>
dask.array<getitem, shape=(7306, 337, 519), dtype=float64, chunksize=(1, 328, 433), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2022-06-29T01:00:00 ... 2023-04-29T11:00:00
  * x        (x) float64 2.95e+04 3.05e+04 3.15e+04 ... 5.465e+05 5.475e+05
  * y        (y) float64 5.65e+04 5.75e+04 5.85e+04 ... 3.915e+05 3.925e+05
    crs      int64 0
Attributes:
    cell_methods:    time: point
    esri_pe_string:  PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT...
    grid_mapping:    crs
    long_name:       2-m Air Temperature
    proj4:           +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1=30.0...
    remap:           remapped via ESMF regrid_with_weights: Bilinear
    standard_name:   air_temperature
    units:           K
subset.isel(time=0).plot(size=8, aspect=1.6);
_images/0d6e36433fd004d76369d05a81fdcecb1ff4ab1dfc19d39b7d7b9f8cde092072.png

Spend a bit of time playing around with these datasets.

When you’re done here, close down the kernel and move on to Timeseries.