Tutorial¶
Intake-thredds provides an interface that combines functionality from siphon
and intake
to retrieve data from THREDDS data servers. This tutorial provides an introduction to the API and features of intake-thredds. Let’s begin by importing intake
.
import intake
Loading a catalog¶
You can load data from a THREDDS catalog by providing the URL to a valid THREDDS catalog:
cat_url = 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'
catalog = intake.open_thredds_cat(cat_url, name='noaa-ersst-catalog')
catalog
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[2], line 2
1 cat_url = 'https://psl.noaa.gov/thredds/catalog/Datasets/noaa.ersst/catalog.xml'
----> 2 catalog = intake.open_thredds_cat(cat_url, name='noaa-ersst-catalog')
3 catalog
TypeError: ThreddsCatalog.__new__() got an unexpected keyword argument 'name'
Using the catalog¶
Once you’ve loaded a catalog, you can display its contents by iterating over its entries:
list(catalog)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[3], line 1
----> 1 list(catalog)
NameError: name 'catalog' is not defined
Once you’ve identified a dataset of interest, you can access it as follows:
source = catalog['err.mnmean.v3.nc']
print(source)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[4], line 1
----> 1 source = catalog['err.mnmean.v3.nc']
2 print(source)
NameError: name 'catalog' is not defined
Loading a dataset¶
To load a dataset of interest, you can use the to_dask()
method which is available on a source object:
%%time
ds = source().to_dask()
ds
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed exec>:1
NameError: name 'source' is not defined
The to_dask()
reads only metadata needed to construct an xarray.Dataset
. The actual data are streamed over the network when computation routines are invoked on the dataset.
By default, intake-thredds
uses chunks={}
to load the dataset with dask using a single chunk for all arrays. You can use a different chunking scheme by prividing a custom value of chunks before calling .to_dask()
:
%%time
# Use a custom chunking scheme
ds = source(chunks={'time': 100, 'lon': 90}).to_dask()
ds
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed exec>:2
NameError: name 'source' is not defined
Working with nested catalogs¶
In some scenarious, a THREDDS catalog can reference another THREDDS catalog. This results into a nested structure consisting of a parent catalog and children catalogs:
cat_url = 'https://psl.noaa.gov/thredds/catalog.xml'
catalog = intake.open_thredds_cat(cat_url)
list(catalog)
[]
print(list(catalog['Datasets']))
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[8], line 1
----> 1 print(list(catalog['Datasets']))
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/mixins.py:41, in PipelineMixin.__getitem__(self, item)
36 outtype = self.output_instance
37 if "Catalog" in outtype:
38 # a better way to mark this condition, perhaps the datatype's structure?
39 # TODO: this prevents from doing a transform/convert on a cat, so must use
40 # .transform for that
---> 41 return self.read()[item]
42 if isinstance(self, Pipeline):
43 return self.with_step((GetItem, (item,), {}), out_instance=outtype)
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/entry.py:534, in Catalog.__getitem__(self, item)
532 return item.to_data(user_parameters=ups, **(kw or {}))
533 else:
--> 534 raise KeyError(item)
KeyError: 0
print(list(catalog['Datasets']['ncep.reanalysis.dailyavgs']))
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[9], line 1
----> 1 print(list(catalog['Datasets']['ncep.reanalysis.dailyavgs']))
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/mixins.py:41, in PipelineMixin.__getitem__(self, item)
36 outtype = self.output_instance
37 if "Catalog" in outtype:
38 # a better way to mark this condition, perhaps the datatype's structure?
39 # TODO: this prevents from doing a transform/convert on a cat, so must use
40 # .transform for that
---> 41 return self.read()[item]
42 if isinstance(self, Pipeline):
43 return self.with_step((GetItem, (item,), {}), out_instance=outtype)
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/entry.py:534, in Catalog.__getitem__(self, item)
532 return item.to_data(user_parameters=ups, **(kw or {}))
533 else:
--> 534 raise KeyError(item)
KeyError: 0
To load data from such a nested catalog, intake-thredds
provides a special source object THREDDSMergedSource
accessible via the .open_thredds_merged()
function. The inputs for this function consists of:
url
: top level URL of the THREDDS catalogpath
: a list of paths for child catalogs to descend down. The paths can include glob characters (*, ?). These glob characters are used for matching.
source = intake.open_thredds_merged(
cat_url, path=['Datasets', 'ncep.reanalysis.dailyavgs', 'surface', 'air*sig995*194*.nc']
)
print(source)
<intake_thredds.source.THREDDSMergedSource object at 0x7f62552e3510>
To load the data into an xarray Dataset
, you can invoke the .to_dask()
method.
Internally, THREDDSMergedSource
does the following:
descend down the given paths and collect all available datasets.
load each dataset in a dataset.
combine all loaded datasets into a single dataset.
%%time
ds = source.to_dask()
ds
Dataset(s): 0%| | 0/2 [00:00<?, ?it/s]
/home/docs/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/pydap/handlers/dap.py:123: UserWarning: PyDAP was unable to determine the DAP protocol defaulting to DAP2 which is consider legacy and may result in slower responses. For more, see go to https://www.opendap.org/faq-page.
_warnings.warn(
Dataset(s): 50%|████████████████ | 1/2 [00:01<00:01, 1.94s/it]
/home/docs/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/pydap/handlers/dap.py:123: UserWarning: PyDAP was unable to determine the DAP protocol defaulting to DAP2 which is consider legacy and may result in slower responses. For more, see go to https://www.opendap.org/faq-page.
_warnings.warn(
Dataset(s): 100%|████████████████████████████████| 2/2 [00:03<00:00, 1.71s/it]
Dataset(s): 100%|████████████████████████████████| 2/2 [00:03<00:00, 1.74s/it]
CPU times: user 1.14 s, sys: 105 ms, total: 1.24 s
Wall time: 4.58 s
<xarray.Dataset> Size: 31MB Dimensions: (time: 731, lat: 73, lon: 144, nbnds: 2) Coordinates: * lon (lon) float32 576B 0.0 2.5 5.0 7.5 ... 350.0 352.5 355.0 357.5 * time (time) datetime64[ns] 6kB 1948-01-01 1948-01-02 ... 1949-12-31 * lat (lat) float32 292B 90.0 87.5 85.0 82.5 ... -85.0 -87.5 -90.0 Dimensions without coordinates: nbnds Data variables: air (time, lat, lon) float32 31MB dask.array<chunksize=(366, 73, 144), meta=np.ndarray> time_bnds (time, nbnds) float64 12kB dask.array<chunksize=(366, 2), meta=np.ndarray> Attributes: Conventions: COARDS title: mean daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\... platform: Model history: created 99/05/11 by Hoop (netCDF2.3)\nCo... dataset_title: NCEP-NCAR Reanalysis 1 References: http://www.psl.noaa.gov/data/gridded/dat... _NCProperties: version=2,netcdf=4.6.3,hdf5=1.10.5 DODS_EXTRA.Unlimited_Dimension: time
Caching¶
Under the hood intake-thredds
uses the driver='opendap'
from intake-xarray
by default. You can also choose
driver='netcdf'
, which in combination with fsspec
caches files by appending simplecache::
to the url,
see fsspec docs.
import os
import fsspec
# specify caching location, where to store files to with their original names
fsspec.config.conf['simplecache'] = {'cache_storage': 'my_caching_folder', 'same_names': True}
cat_url = 'https://psl.noaa.gov/thredds/catalog.xml'
source = intake.open_thredds_merged(
f'simplecache::{cat_url}',
path=['Datasets', 'ncep.reanalysis.dailyavgs', 'surface', 'air.sig995.194*.nc'],
driver='netcdf', # specify netcdf driver to open HTTPServer
)
print(source)
<intake_thredds.source.THREDDSMergedSource object at 0x7f625513b110>
%time ds = source.to_dask()
---------------------------------------------------------------------------
InvalidSchema Traceback (most recent call last)
File <timed exec>:1
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake_thredds/source.py:83, in THREDDSMergedSource.read(self, xarray_kwargs)
81 part = self.path[i]
82 if '*' not in part and '?' not in part:
---> 83 cat = cat[part].read(make=self.driver[-3:])
84 else:
85 break
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/readers.py:121, in BaseReader.read(self, *args, **kwargs)
119 kw.update(kwargs)
120 args = kw.pop("args", ()) or args
--> 121 return self._read(*args, **kw)
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/catalogs.py:423, in THREDDSCatalogReader._read(self, data, make, **kwargs)
419 from siphon.catalog import TDSCatalog
421 from intake.readers.readers import XArrayDatasetReader
--> 423 thr = TDSCatalog(data.url)
424 cat = THREDDSCatalog(metadata=thr.metadata)
425 for r in thr.catalog_refs.values():
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/siphon/catalog.py:288, in TDSCatalog.__init__(self, catalog_url)
285 self.session = session_manager.create_session()
287 # get catalog.xml file
--> 288 resp = self.session.get(catalog_url)
289 resp.raise_for_status()
291 # top level server url
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:602, in Session.get(self, url, **kwargs)
594 r"""Sends a GET request. Returns :class:`Response` object.
595
596 :param url: URL for the new :class:`Request` object.
597 :param \*\*kwargs: Optional arguments that ``request`` takes.
598 :rtype: requests.Response
599 """
601 kwargs.setdefault("allow_redirects", True)
--> 602 return self.request("GET", url, **kwargs)
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
584 send_kwargs = {
585 "timeout": timeout,
586 "allow_redirects": allow_redirects,
587 }
588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
591 return resp
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:697, in Session.send(self, request, **kwargs)
694 hooks = request.hooks
696 # Get the appropriate adapter to use
--> 697 adapter = self.get_adapter(url=request.url)
699 # Start time (approximately) of the request
700 start = preferred_clock()
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:792, in Session.get_adapter(self, url)
789 return adapter
791 # Nothing matches :-/
--> 792 raise InvalidSchema(f"No connection adapters were found for {url!r}")
InvalidSchema: No connection adapters were found for 'simplecache::https://psl.noaa.gov/thredds/catalog/Datasets/catalog.xml'
assert os.path.exists('my_caching_folder/air.sig995.1949.nc')
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
Cell In[14], line 1
----> 1 assert os.path.exists('my_caching_folder/air.sig995.1949.nc')
AssertionError:
# after caching very fast
%time ds = source.to_dask()
---------------------------------------------------------------------------
InvalidSchema Traceback (most recent call last)
File <timed exec>:1
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake_thredds/source.py:83, in THREDDSMergedSource.read(self, xarray_kwargs)
81 part = self.path[i]
82 if '*' not in part and '?' not in part:
---> 83 cat = cat[part].read(make=self.driver[-3:])
84 else:
85 break
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/readers.py:121, in BaseReader.read(self, *args, **kwargs)
119 kw.update(kwargs)
120 args = kw.pop("args", ()) or args
--> 121 return self._read(*args, **kw)
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/intake/readers/catalogs.py:423, in THREDDSCatalogReader._read(self, data, make, **kwargs)
419 from siphon.catalog import TDSCatalog
421 from intake.readers.readers import XArrayDatasetReader
--> 423 thr = TDSCatalog(data.url)
424 cat = THREDDSCatalog(metadata=thr.metadata)
425 for r in thr.catalog_refs.values():
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/siphon/catalog.py:288, in TDSCatalog.__init__(self, catalog_url)
285 self.session = session_manager.create_session()
287 # get catalog.xml file
--> 288 resp = self.session.get(catalog_url)
289 resp.raise_for_status()
291 # top level server url
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:602, in Session.get(self, url, **kwargs)
594 r"""Sends a GET request. Returns :class:`Response` object.
595
596 :param url: URL for the new :class:`Request` object.
597 :param \*\*kwargs: Optional arguments that ``request`` takes.
598 :rtype: requests.Response
599 """
601 kwargs.setdefault("allow_redirects", True)
--> 602 return self.request("GET", url, **kwargs)
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
584 send_kwargs = {
585 "timeout": timeout,
586 "allow_redirects": allow_redirects,
587 }
588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
591 return resp
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:697, in Session.send(self, request, **kwargs)
694 hooks = request.hooks
696 # Get the appropriate adapter to use
--> 697 adapter = self.get_adapter(url=request.url)
699 # Start time (approximately) of the request
700 start = preferred_clock()
File ~/checkouts/readthedocs.org/user_builds/intake-thredds/conda/latest/lib/python3.11/site-packages/requests/sessions.py:792, in Session.get_adapter(self, url)
789 return adapter
791 # Nothing matches :-/
--> 792 raise InvalidSchema(f"No connection adapters were found for {url!r}")
InvalidSchema: No connection adapters were found for 'simplecache::https://psl.noaa.gov/thredds/catalog/Datasets/catalog.xml'
Multi-file concat_kwargs
with cfgrib
engine¶
Another example demonstrating how to use caching consists of reading in ensemble members in parallel for GEFS. The example below reads in 21 ensemble members for a single timestep. It also demonstrates usage of xarray_kwargs
which are passed on to xarray
for opening the files, which in this cases uses the cfgrib
engine:
cat_url = 'https://www.ncei.noaa.gov/thredds/catalog/model-gefs-003/202008/20200831/catalog.xml'
source = intake.open_thredds_merged(
f'simplecache::{cat_url}',
path=["NCEP gens-a Grid 3 Member-Forecast *-372 for 2020-08-31 00:00*"],
driver="netcdf",
concat_kwargs={"dim": "number"},
xarray_kwargs=dict(
engine="cfgrib",
backend_kwargs=dict(
filter_by_keys={"typeOfLevel": "heightAboveGround", "cfVarName": "t2m"}
),
),
)
source.to_dask()
Dataset(s): 0%| | 0/21 [00:00<?, ?it/s]
Dataset(s): 5%|█▍ | 1/21 [00:00<00:16, 1.23it/s]
Dataset(s): 10%|██▉ | 2/21 [00:01<00:19, 1.00s/it]
Dataset(s): 14%|████▍ | 3/21 [00:03<00:18, 1.04s/it]
Dataset(s): 19%|█████▉ | 4/21 [00:03<00:15, 1.07it/s]
Dataset(s): 24%|███████▍ | 5/21 [00:04<00:13, 1.15it/s]
Dataset(s): 29%|████████▊ | 6/21 [00:06<00:19, 1.28s/it]
Dataset(s): 33%|██████████▎ | 7/21 [00:07<00:15, 1.10s/it]
Dataset(s): 38%|███████████▊ | 8/21 [00:08<00:12, 1.00it/s]
Dataset(s): 43%|█████████████▎ | 9/21 [00:08<00:10, 1.10it/s]
Dataset(s): 48%|██████████████▎ | 10/21 [00:09<00:09, 1.16it/s]
Dataset(s): 52%|███████████████▋ | 11/21 [00:10<00:08, 1.19it/s]
Dataset(s): 57%|█████████████████▏ | 12/21 [00:11<00:07, 1.23it/s]
Dataset(s): 62%|██████████████████▌ | 13/21 [00:11<00:06, 1.25it/s]
Dataset(s): 67%|████████████████████ | 14/21 [00:12<00:05, 1.25it/s]
Dataset(s): 71%|█████████████████████▍ | 15/21 [00:13<00:04, 1.28it/s]
Dataset(s): 76%|██████████████████████▊ | 16/21 [00:14<00:03, 1.29it/s]
Dataset(s): 81%|████████████████████████▎ | 17/21 [00:14<00:03, 1.30it/s]
Dataset(s): 86%|█████████████████████████▋ | 18/21 [00:15<00:02, 1.27it/s]
Dataset(s): 90%|███████████████████████████▏ | 19/21 [00:16<00:01, 1.26it/s]
Dataset(s): 95%|████████████████████████████▌ | 20/21 [00:17<00:00, 1.27it/s]
Dataset(s): 100%|██████████████████████████████| 21/21 [00:18<00:00, 1.24it/s]
Dataset(s): 100%|██████████████████████████████| 21/21 [00:18<00:00, 1.15it/s]
<xarray.Dataset> Size: 0B Dimensions: () Data variables: *empty* Attributes: Conventions: CF-1.7 history: 2025-01-15T14:16 GRIB to CDM+CF via cfgrib-0.9.15.0/ecCodes...