Tutorial

Intake-thredds provides an interface that combines functionality from siphon and intake to retrieve data from THREDDS data servers. This tutorial provides an introduction to the API and features of intake-thredds. Let’s begin by importing intake.

import intake

Loading a catalog

You can load data from a THREDDS catalog by providing the URL to a valid THREDDS catalog:

cat_url = 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'
catalog = intake.open_thredds_cat(cat_url, name='noaa-ersst-catalog')
catalog
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[2], line 2
      1 cat_url = 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'
----> 2 catalog = intake.open_thredds_cat(cat_url, name='noaa-ersst-catalog')
      3 catalog

TypeError: ThreddsCatalog.__new__() got an unexpected keyword argument 'name'

Using the catalog

Once you’ve loaded a catalog, you can display its contents by iterating over its entries:

list(catalog)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[3], line 1
----> 1 list(catalog)

NameError: name 'catalog' is not defined

Once you’ve identified a dataset of interest, you can access it as follows:

source = catalog['err.mnmean.v3.nc']
print(source)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[4], line 1
----> 1 source = catalog['err.mnmean.v3.nc']
      2 print(source)

NameError: name 'catalog' is not defined

Loading a dataset

To load a dataset of interest, you can use the to_dask() method which is available on a source object:

%%time
ds = source().to_dask()
ds
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
File <timed exec>:1

NameError: name 'source' is not defined

The to_dask() reads only metadata needed to construct an xarray.Dataset. The actual data are streamed over the network when computation routines are invoked on the dataset. By default, intake-thredds uses chunks={} to load the dataset with dask using a single chunk for all arrays. You can use a different chunking scheme by prividing a custom value of chunks before calling .to_dask():

%%time
# Use a custom chunking scheme
ds = source(chunks={'time': 100, 'lon': 90}).to_dask()
ds
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
File <timed exec>:2

NameError: name 'source' is not defined

Working with nested catalogs

In some scenarious, a THREDDS catalog can reference another THREDDS catalog. This results into a nested structure consisting of a parent catalog and children catalogs:

cat_url = 'https://psl.noaa.gov/thredds/catalog.xml'
catalog = intake.open_thredds_cat(cat_url)
list(catalog)
[]
print(list(catalog['Datasets']))
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[8], line 1
----> 1 print(list(catalog['Datasets']))

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/mixins.py:41, in PipelineMixin.__getitem__(self, item)
     36 outtype = self.output_instance
     37 if "Catalog" in outtype:
     38     # a better way to mark this condition, perhaps the datatype's structure?
     39     # TODO: this prevents from doing a transform/convert on a cat, so must use
     40     #  .transform for that
---> 41     return self.read()[item]
     42 if isinstance(self, Pipeline):
     43     return self.with_step((GetItem, (item,), {}), out_instance=outtype)

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/entry.py:534, in Catalog.__getitem__(self, item)
    532     return item.to_data(user_parameters=ups, **(kw or {}))
    533 else:
--> 534     raise KeyError(item)

KeyError: 0
print(list(catalog['Datasets']['ncep.reanalysis.dailyavgs']))
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[9], line 1
----> 1 print(list(catalog['Datasets']['ncep.reanalysis.dailyavgs']))

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/mixins.py:41, in PipelineMixin.__getitem__(self, item)
     36 outtype = self.output_instance
     37 if "Catalog" in outtype:
     38     # a better way to mark this condition, perhaps the datatype's structure?
     39     # TODO: this prevents from doing a transform/convert on a cat, so must use
     40     #  .transform for that
---> 41     return self.read()[item]
     42 if isinstance(self, Pipeline):
     43     return self.with_step((GetItem, (item,), {}), out_instance=outtype)

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/entry.py:534, in Catalog.__getitem__(self, item)
    532     return item.to_data(user_parameters=ups, **(kw or {}))
    533 else:
--> 534     raise KeyError(item)

KeyError: 0

To load data from such a nested catalog, intake-thredds provides a special source object THREDDSMergedSource accessible via the .open_thredds_merged() function. The inputs for this function consists of:

  • url: top level URL of the THREDDS catalog

  • path: a list of paths for child catalogs to descend down. The paths can include glob characters (*, ?). These glob characters are used for matching.

source = intake.open_thredds_merged(
    cat_url, path=['Datasets', 'ncep.reanalysis.dailyavgs', 'surface', 'air*sig995*194*.nc']
)
print(source)
<intake_thredds.source.THREDDSMergedSource object at 0x7f62552e3510>

To load the data into an xarray Dataset, you can invoke the .to_dask() method. Internally, THREDDSMergedSource does the following:

  • descend down the given paths and collect all available datasets.

  • load each dataset in a dataset.

  • combine all loaded datasets into a single dataset.

%%time
ds = source.to_dask()
ds
Dataset(s):   0%|                                        | 0/2 [00:00<?, ?it/s]
/home/docs/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/pydap/handlers/dap.py:123: UserWarning: PyDAP was unable to determine the DAP protocol defaulting to DAP2 which is consider legacy and may result in slower responses. For more, see go to https://www.opendap.org/faq-page.
  _warnings.warn(
Dataset(s):  50%|████████████████                | 1/2 [00:01<00:01,  1.94s/it]
/home/docs/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/pydap/handlers/dap.py:123: UserWarning: PyDAP was unable to determine the DAP protocol defaulting to DAP2 which is consider legacy and may result in slower responses. For more, see go to https://www.opendap.org/faq-page.
  _warnings.warn(
Dataset(s): 100%|████████████████████████████████| 2/2 [00:03<00:00,  1.71s/it]
Dataset(s): 100%|████████████████████████████████| 2/2 [00:03<00:00,  1.74s/it]
CPU times: user 1.14 s, sys: 105 ms, total: 1.24 s
Wall time: 4.58 s

<xarray.Dataset> Size: 31MB
Dimensions:    (time: 731, lat: 73, lon: 144, nbnds: 2)
Coordinates:
  * lon        (lon) float32 576B 0.0 2.5 5.0 7.5 ... 350.0 352.5 355.0 357.5
  * time       (time) datetime64[ns] 6kB 1948-01-01 1948-01-02 ... 1949-12-31
  * lat        (lat) float32 292B 90.0 87.5 85.0 82.5 ... -85.0 -87.5 -90.0
Dimensions without coordinates: nbnds
Data variables:
    air        (time, lat, lon) float32 31MB dask.array<chunksize=(366, 73, 144), meta=np.ndarray>
    time_bnds  (time, nbnds) float64 12kB dask.array<chunksize=(366, 2), meta=np.ndarray>
Attributes:
    Conventions:                     COARDS
    title:                           mean daily NMC reanalysis (1948)
    description:                     Data is from NMC initialized reanalysis\...
    platform:                        Model
    history:                         created 99/05/11 by Hoop (netCDF2.3)\nCo...
    dataset_title:                   NCEP-NCAR Reanalysis 1
    References:                      http://www.psl.noaa.gov/data/gridded/dat...
    _NCProperties:                   version=2,netcdf=4.6.3,hdf5=1.10.5
    DODS_EXTRA.Unlimited_Dimension:  time

Caching

Under the hood intake-thredds uses the driver='opendap' from intake-xarray by default. You can also choose driver='netcdf', which in combination with fsspec caches files by appending simplecache:: to the url, see fsspec docs.

import os

import fsspec

# specify caching location, where to store files to with their original names
fsspec.config.conf['simplecache'] = {'cache_storage': 'my_caching_folder', 'same_names': True}

cat_url = 'https://psl.noaa.gov/thredds/catalog.xml'
source = intake.open_thredds_merged(
    f'simplecache::{cat_url}',
    path=['Datasets', 'ncep.reanalysis.dailyavgs', 'surface', 'air.sig995.194*.nc'],
    driver='netcdf',  # specify netcdf driver to open HTTPServer
)
print(source)
<intake_thredds.source.THREDDSMergedSource object at 0x7f625513b110>
%time ds = source.to_dask()
---------------------------------------------------------------------------
InvalidSchema                             Traceback (most recent call last)
File <timed exec>:1

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake_thredds/source.py:83, in THREDDSMergedSource.read(self, xarray_kwargs)
     81 part = self.path[i]
     82 if '*' not in part and '?' not in part:
---> 83     cat = cat[part].read(make=self.driver[-3:])
     84 else:
     85     break

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/readers.py:121, in BaseReader.read(self, *args, **kwargs)
    119 kw.update(kwargs)
    120 args = kw.pop("args", ()) or args
--> 121 return self._read(*args, **kw)

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/catalogs.py:423, in THREDDSCatalogReader._read(self, data, make, **kwargs)
    419 from siphon.catalog import TDSCatalog
    421 from intake.readers.readers import XArrayDatasetReader
--> 423 thr = TDSCatalog(data.url)
    424 cat = THREDDSCatalog(metadata=thr.metadata)
    425 for r in thr.catalog_refs.values():

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/siphon/catalog.py:288, in TDSCatalog.__init__(self, catalog_url)
    285 self.session = session_manager.create_session()
    287 # get catalog.xml file
--> 288 resp = self.session.get(catalog_url)
    289 resp.raise_for_status()
    291 # top level server url

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:602, in Session.get(self, url, **kwargs)
    594 r"""Sends a GET request. Returns :class:`Response` object.
    595 
    596 :param url: URL for the new :class:`Request` object.
    597 :param \*\*kwargs: Optional arguments that ``request`` takes.
    598 :rtype: requests.Response
    599 """
    601 kwargs.setdefault("allow_redirects", True)
--> 602 return self.request("GET", url, **kwargs)

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    584 send_kwargs = {
    585     "timeout": timeout,
    586     "allow_redirects": allow_redirects,
    587 }
    588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
    591 return resp

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:697, in Session.send(self, request, **kwargs)
    694 hooks = request.hooks
    696 # Get the appropriate adapter to use
--> 697 adapter = self.get_adapter(url=request.url)
    699 # Start time (approximately) of the request
    700 start = preferred_clock()

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:792, in Session.get_adapter(self, url)
    789         return adapter
    791 # Nothing matches :-/
--> 792 raise InvalidSchema(f"No connection adapters were found for {url!r}")

InvalidSchema: No connection adapters were found for 'simplecache::https://psl.noaa.gov/thredds/catalog/Datasets/catalog.xml'
assert os.path.exists('my_caching_folder/air.sig995.1949.nc')
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In[14], line 1
----> 1 assert os.path.exists('my_caching_folder/air.sig995.1949.nc')

AssertionError: 
# after caching very fast
%time ds = source.to_dask()
---------------------------------------------------------------------------
InvalidSchema                             Traceback (most recent call last)
File <timed exec>:1

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake_thredds/source.py:83, in THREDDSMergedSource.read(self, xarray_kwargs)
     81 part = self.path[i]
     82 if '*' not in part and '?' not in part:
---> 83     cat = cat[part].read(make=self.driver[-3:])
     84 else:
     85     break

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/readers.py:121, in BaseReader.read(self, *args, **kwargs)
    119 kw.update(kwargs)
    120 args = kw.pop("args", ()) or args
--> 121 return self._read(*args, **kw)

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/catalogs.py:423, in THREDDSCatalogReader._read(self, data, make, **kwargs)
    419 from siphon.catalog import TDSCatalog
    421 from intake.readers.readers import XArrayDatasetReader
--> 423 thr = TDSCatalog(data.url)
    424 cat = THREDDSCatalog(metadata=thr.metadata)
    425 for r in thr.catalog_refs.values():

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/siphon/catalog.py:288, in TDSCatalog.__init__(self, catalog_url)
    285 self.session = session_manager.create_session()
    287 # get catalog.xml file
--> 288 resp = self.session.get(catalog_url)
    289 resp.raise_for_status()
    291 # top level server url

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:602, in Session.get(self, url, **kwargs)
    594 r"""Sends a GET request. Returns :class:`Response` object.
    595 
    596 :param url: URL for the new :class:`Request` object.
    597 :param \*\*kwargs: Optional arguments that ``request`` takes.
    598 :rtype: requests.Response
    599 """
    601 kwargs.setdefault("allow_redirects", True)
--> 602 return self.request("GET", url, **kwargs)

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    584 send_kwargs = {
    585     "timeout": timeout,
    586     "allow_redirects": allow_redirects,
    587 }
    588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
    591 return resp

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:697, in Session.send(self, request, **kwargs)
    694 hooks = request.hooks
    696 # Get the appropriate adapter to use
--> 697 adapter = self.get_adapter(url=request.url)
    699 # Start time (approximately) of the request
    700 start = preferred_clock()

File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:792, in Session.get_adapter(self, url)
    789         return adapter
    791 # Nothing matches :-/
--> 792 raise InvalidSchema(f"No connection adapters were found for {url!r}")

InvalidSchema: No connection adapters were found for 'simplecache::https://psl.noaa.gov/thredds/catalog/Datasets/catalog.xml'

Multi-file concat_kwargs with cfgrib engine

Another example demonstrating how to use caching consists of reading in ensemble members in parallel for GEFS. The example below reads in 21 ensemble members for a single timestep. It also demonstrates usage of xarray_kwargs which are passed on to xarray for opening the files, which in this cases uses the cfgrib engine:

cat_url = 'https://www.ncei.noaa.gov/thredds/catalog/model-gefs-003/202008/20200831/catalog.xml'
source = intake.open_thredds_merged(
    f'simplecache::{cat_url}',
    path=["NCEP gens-a Grid 3 Member-Forecast *-372 for 2020-08-31 00:00*"],
    driver="netcdf",
    concat_kwargs={"dim": "number"},
    xarray_kwargs=dict(
        engine="cfgrib",
        backend_kwargs=dict(
            filter_by_keys={"typeOfLevel": "heightAboveGround", "cfVarName": "t2m"}
        ),
    ),
)
source.to_dask()
Dataset(s):   0%|                                       | 0/21 [00:00<?, ?it/s]
Dataset(s):   5%|█▍                             | 1/21 [00:00<00:16,  1.23it/s]
Dataset(s):  10%|██▉                            | 2/21 [00:01<00:19,  1.00s/it]
Dataset(s):  14%|████▍                          | 3/21 [00:03<00:18,  1.04s/it]
Dataset(s):  19%|█████▉                         | 4/21 [00:03<00:15,  1.07it/s]
Dataset(s):  24%|███████▍                       | 5/21 [00:04<00:13,  1.15it/s]
Dataset(s):  29%|████████▊                      | 6/21 [00:06<00:19,  1.28s/it]
Dataset(s):  33%|██████████▎                    | 7/21 [00:07<00:15,  1.10s/it]
Dataset(s):  38%|███████████▊                   | 8/21 [00:08<00:12,  1.00it/s]
Dataset(s):  43%|█████████████▎                 | 9/21 [00:08<00:10,  1.10it/s]
Dataset(s):  48%|██████████████▎               | 10/21 [00:09<00:09,  1.16it/s]
Dataset(s):  52%|███████████████▋              | 11/21 [00:10<00:08,  1.19it/s]
Dataset(s):  57%|█████████████████▏            | 12/21 [00:11<00:07,  1.23it/s]
Dataset(s):  62%|██████████████████▌           | 13/21 [00:11<00:06,  1.25it/s]
Dataset(s):  67%|████████████████████          | 14/21 [00:12<00:05,  1.25it/s]
Dataset(s):  71%|█████████████████████▍        | 15/21 [00:13<00:04,  1.28it/s]
Dataset(s):  76%|██████████████████████▊       | 16/21 [00:14<00:03,  1.29it/s]
Dataset(s):  81%|████████████████████████▎     | 17/21 [00:14<00:03,  1.30it/s]
Dataset(s):  86%|█████████████████████████▋    | 18/21 [00:15<00:02,  1.27it/s]
Dataset(s):  90%|███████████████████████████▏  | 19/21 [00:16<00:01,  1.26it/s]
Dataset(s):  95%|████████████████████████████▌ | 20/21 [00:17<00:00,  1.27it/s]
Dataset(s): 100%|██████████████████████████████| 21/21 [00:18<00:00,  1.24it/s]
Dataset(s): 100%|██████████████████████████████| 21/21 [00:18<00:00,  1.15it/s]

<xarray.Dataset> Size: 0B
Dimensions:  ()
Data variables:
    *empty*
Attributes:
    Conventions:  CF-1.7
    history:      2025-01-15T14:16 GRIB to CDM+CF via cfgrib-0.9.15.0/ecCodes...