5 Minute Tutorial#

OPeNDAP - the vision#

The original vision of OPeNDAP (Cornillion, et al 1993) was to democratize remote data access, by making the equivalencies

\( \;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\; \boxed{\text{URL} \approx \text{Remote Dataset} }\)

\( \;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\;\; \boxed{\text{URL + Constraints} \approx \text{Subset of Remote Dataset}} \)

That led to the development of the DAP2 protocol (formerly known as DODS). Currently, OPeNDAP and Unidata servers implement the modern and broader DAP4 protocol (see DAP4 specification), to continue enabling the original vision of OPeNDAP.

What pydap enables:#

The internal logic of PyDAP enables the construction of constraint expressions for each url, interactively, hiding the abstraction away from the user. Furthermore, using PyDAP as a backend engine for Xarray, the original OPeNDAP vision can scaled with multi-core parallelism. Nonetheless, basic understanding about the use of Constraint Expression comes in handy when aggregating multiple files, and can lead to more efficient worklows.

Objectives:#

  • Demonstrate how to specify the DAP4 protocol to the remote server.

  • Use Xarray with PyDAP as the backend engine to download a subset of remote data in two user case scenarios: a) an NcML aggregation file (virtual dataset), and b) across two Netcdf files.

  • Demonstrate distinct ways to use Constraint Expression (CEs), and how these are passed down to the remote server so that subsetting is done by the server, in a data-proximate way, without performace loss on the client side.

Requirements#

Note

The vast majority of NASA’s OPeNDAP servers implement the DAP4 protocol.

from pydap.client import open_url, consolidate_metadata, create_session
import xarray as xr
import numpy as np
# create a session to inspect downloads. cache_name must have `debug`
session = create_session(use_cache=True, cache_kwargs={"cache_name":'data/debug_case1'})
session.cache.clear()

Case 1) Subsetting an NcML file#

The file is an NcML file representing a virtually aggregated dataset, which can be found in the test server and it is named: aggExisting.ncml.

OPeNDAP servers can be configured to produce NcML virtual datasets. Their advantage is that with an individual OPeNDAP url, a user has access to an entire collection of files from which to subset.

ncml_url = "http://test.opendap.org/opendap/data/ncml/agg/aggExisting.ncml"
dap4_ncml_url = ncml_url.replace("http",  "dap4")
print("=============================================================\n Remote DAP4 URL: \n", dap4_ncml_url, "\n=============================================================")
=============================================================
 Remote DAP4 URL: 
 dap4://test.opendap.org/opendap/data/ncml/agg/aggExisting.ncml 
=============================================================
ds = xr.open_dataset(
    dap4_ncml_url, 
    engine='pydap',
    session = session,
    chunks={},
)
ds
---------------------------------------------------------------------------
ResponseError                             Traceback (most recent call last)
ResponseError: too many 500 error responses

The above exception was the direct cause of the following exception:

MaxRetryError                             Traceback (most recent call last)
File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests/adapters.py:696, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
    695 try:
--> 696     resp = conn.urlopen(
    697         method=request.method,
    698         url=url,
    699         body=request.body,  # type: ignore[arg-type]  # urllib3 stubs don't accept Iterable[bytes | str]
    700         headers=request.headers,  # type: ignore[arg-type]  # urllib3#3072
    701         redirect=False,
    702         assert_same_host=False,
    703         preload_content=False,
    704         decode_content=False,
    705         retries=self.max_retries,
    706         timeout=resolved_timeout,
    707         chunked=chunked,
    708     )
    710 except (ProtocolError, OSError) as err:

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/urllib3/connectionpool.py:955, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    954     log.debug("Retry: %s", url)
--> 955     return self.urlopen(
    956         method,
    957         url,
    958         body,
    959         headers,
    960         retries=retries,
    961         redirect=redirect,
    962         assert_same_host=assert_same_host,
    963         timeout=timeout,
    964         pool_timeout=pool_timeout,
    965         release_conn=release_conn,
    966         chunked=chunked,
    967         body_pos=body_pos,
    968         preload_content=preload_content,
    969         decode_content=decode_content,
    970         **response_kw,
    971     )
    973 return response

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/urllib3/connectionpool.py:955, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    954     log.debug("Retry: %s", url)
--> 955     return self.urlopen(
    956         method,
    957         url,
    958         body,
    959         headers,
    960         retries=retries,
    961         redirect=redirect,
    962         assert_same_host=assert_same_host,
    963         timeout=timeout,
    964         pool_timeout=pool_timeout,
    965         release_conn=release_conn,
    966         chunked=chunked,
    967         body_pos=body_pos,
    968         preload_content=preload_content,
    969         decode_content=decode_content,
    970         **response_kw,
    971     )
    973 return response

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/urllib3/connectionpool.py:945, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    944 try:
--> 945     retries = retries.increment(method, url, response=response, _pool=self)
    946 except MaxRetryError:

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/urllib3/util/retry.py:543, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)
    542     reason = error or ResponseError(cause)
--> 543     raise MaxRetryError(_pool, url, reason) from reason  # type: ignore[arg-type]
    545 log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)

MaxRetryError: HTTPConnectionPool(host='test.opendap.org', port=80): Max retries exceeded with url: /opendap/data/ncml/agg/aggExisting.ncml.dmr (Caused by ResponseError('too many 500 error responses'))

During handling of the above exception, another exception occurred:

RetryError                                Traceback (most recent call last)
Cell In[4], line 1
----> 1 ds = xr.open_dataset(
      2     dap4_ncml_url,
      3     engine='pydap',
      4     session = session,

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/xarray/backends/api.py:607, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, create_default_indexes, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    595 decoders = _resolve_decoders_kwargs(
    596     decode_cf,
    597     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)    603     decode_coords=decode_coords,
    604 )
    606 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 607 backend_ds = backend.open_dataset(
    608     filename_or_obj,
    609     drop_variables=drop_variables,
    610     **decoders,
    611     **kwargs,
    612 )
    613 ds = _dataset_from_backend_dataset(
    614     backend_ds,
    615     filename_or_obj,
   (...)    626     **kwargs,
    627 )
    628 return ds

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/xarray/backends/pydap_.py:279, in PydapBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, application, session, output_grid, timeout, verify, user_charset, checksums)
    257 def open_dataset(
    258     self,
    259     filename_or_obj: (
   (...)    277     checksums=True,
    278 ) -> Dataset:
--> 279     store = PydapDataStore.open(
    280         url=filename_or_obj,
    281         group=group,
    282         application=application,
    283         session=session,
    284         output_grid=output_grid,
    285         timeout=timeout,
    286         verify=verify,
    287         user_charset=user_charset,
    288         checksums=checksums,
    289     )
    290     store_entrypoint = StoreBackendEntrypoint()
    291     with close_on_error(store):

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/xarray/backends/pydap_.py:141, in PydapDataStore.open(cls, url, group, application, session, output_grid, timeout, verify, user_charset, checksums)
    130 kwargs = {
    131     "url": url,
    132     "application": application,
   (...)    137     "user_charset": user_charset,
    138 }
    139 if isinstance(url, str):
    140     # check uit begins with an acceptable scheme
--> 141     dataset = open_url(**kwargs)
    142 elif hasattr(url, "ds"):
    143     # pydap dataset
    144     dataset = url.ds

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/client.py:179, in open_url(url, application, session, output_grid, flat, timeout, verify, checksums, user_charset, protocol, batch, use_cache, session_kwargs, cache_kwargs, get_kwargs)
    172 if not session:
    173     session = create_session(
    174         use_cache=use_cache,
    175         session_kwargs=session_kwargs,
    176         cache_kwargs=cache_kwargs,
    177     )
--> 179 handler = DAPHandler(
    180     url,
    181     application,
    182     session,
    183     output_grid=output_grid,
    184     flat=flat,
    185     timeout=timeout,
    186     verify=verify,
    187     checksums=checksums,
    188     user_charset=user_charset,
    189     protocol=protocol,
    190     get_kwargs=get_kwargs,
    191 )
    192 dataset = handler.dataset
    193 dataset._session = session

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/handlers/dap.py:163, in DAPHandler.__init__(self, url, application, session, output_grid, flat, timeout, verify, checksums, user_charset, protocol, get_kwargs)
    154     arg = (
    155         self.scheme,
    156         self.netloc,
   (...)    160         self.fragment,
    161     )
    162     self.base_url = urlunparse(arg)
--> 163     self.make_dataset()
    164 self.add_proxies()

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/handlers/dap.py:198, in DAPHandler.make_dataset(self)
    194 def make_dataset(
    195     self,
    196 ):
    197     if self.protocol == "dap4":
--> 198         self.dataset_from_dap4()
    199     else:
    200         self.dataset_from_dap2()

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/handlers/dap.py:218, in DAPHandler.dataset_from_dap4(self)
    207     path = self.path
    208 dmr_url = urlunparse(
    209     (
    210         self.scheme,
   (...)    216     )
    217 )
--> 218 r = GET(
    219     dmr_url,
    220     self.application,
    221     self.session,
    222     timeout=self.timeout,
    223     verify=self.verify,
    224     get_kwargs=self.get_kwargs,
    225 )
    226 dmr = safe_charset_text(r, self.user_charset)
    227 self.dataset = dmr_to_dataset(dmr, self.flat)

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/net.py:74, in GET(url, application, session, timeout, verify, use_cache, session_kwargs, cache_kwargs, get_kwargs)
     72     _, _, path, _, query, fragment = urlparse(url)
     73     url = urlunparse(("", "", path, "", _quote(query), fragment))
---> 74 res = create_request(
     75     url,
     76     application=application,
     77     session=session,
     78     timeout=timeout,
     79     verify=verify,
     80     session_kwargs=session_kwargs,
     81     cache_kwargs=cache_kwargs,
     82     get_kwargs=get_kwargs,
     83 )
     84 if isinstance(res, webob_Request):
     85     res = get_response(res, application, verify=verify)

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/net.py:172, in create_request(url, application, timeout, verify, session, session_kwargs, cache_kwargs, get_kwargs)
    170 session_kwargs = session_kwargs or {}
    171 try:
--> 172     req = get_request(
    173         url,
    174         base_session=session,
    175         timeout=timeout,
    176         verify=verify,
    177         get_kwargs=get_kwargs,
    178         backend=cache_kwargs.pop("backend", None),
    179         cache_name=cache_kwargs.pop("http-cache", None),
    180         backend_options=cache_kwargs.pop("backend_options", None),
    181         cache_kwargs=cache_kwargs,
    182         session_kwargs=session_kwargs,
    183     )
    184     req.raise_for_status()
    185     return req

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/pydap/net.py:359, in get_request(url, base_session, timeout, verify, get_kwargs, backend, cache_name, backend_options, cache_kwargs, session_kwargs)
    357 if parsed.scheme == "https":
    358     http_url = urlunparse(parsed._replace(scheme="http"))
--> 359     req = s.get(
    360         http_url,
    361         timeout=timeout,
    362         verify=verify,
    363         allow_redirects=True,
    364         **(get_kwargs or {}),
    365     )
    366 else:
    367     raise e

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests_cache/session.py:133, in CacheMixin.get(self, url, params, **kwargs)
    131 def get(self, url: str, params=None, **kwargs) -> AnyResponse:  # type: ignore
    132     kwargs.setdefault('allow_redirects', True)
--> 133     return self.request('GET', url, params=params, **kwargs)

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests_cache/session.py:189, in CacheMixin.request(self, method, url, headers, expire_after, only_if_cached, refresh, force_refresh, *args, **kwargs)
    187 headers = set_request_headers(headers, expire_after, only_if_cached, refresh, force_refresh)
    188 with patch_form_boundary() if kwargs.get('files') else nullcontext():
--> 189     return super().request(method, url, *args, headers=headers, **kwargs)

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests/sessions.py:651, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    646 send_kwargs = {
    647     "timeout": timeout,
    648     "allow_redirects": allow_redirects,
    649 }
    650 send_kwargs.update(settings)
--> 651 resp = self.send(prep, **send_kwargs)
    653 return resp

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests_cache/session.py:255, in CacheMixin.send(self, request, expire_after, only_if_cached, refresh, force_refresh, **kwargs)
    253     response = self._resend(request, actions, cached_response, **kwargs)  # type: ignore
    254 elif actions.send_request:
--> 255     response = self._send_and_cache(request, actions, cached_response, **kwargs)
    256 else:
    257     response = cached_response  # type: ignore  # Guaranteed to be non-None by this point

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests_cache/session.py:279, in CacheMixin._send_and_cache(self, request, actions, cached_response, **kwargs)
    275 """Send a request and cache the response, unless disabled by settings or headers.
    276 If applicable, also handle conditional requests.
    277 """
    278 request = actions.update_request(request)
--> 279 response = super().send(request, **kwargs)
    280 actions.update_from_response(response)
    282 if not actions.skip_write:

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests/sessions.py:784, in Session.send(self, request, **kwargs)
    781 start = preferred_clock()
    783 # Send the request
--> 784 r = adapter.send(request, **kwargs)
    786 # Total elapsed time of the request (approximately)
    787 elapsed = preferred_clock() - start

File ~/mambaforge/envs/pydap_docs/lib/python3.11/site-packages/requests/adapters.py:720, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
    717         raise ConnectTimeout(e, request=request)
    719 if isinstance(e.reason, ResponseError):
--> 720     raise RetryError(e, request=request)
    722 if isinstance(e.reason, _ProxyError):
    723     raise ProxyError(e, request=request)

RetryError: HTTPConnectionPool(host='test.opendap.org', port=80): Max retries exceeded with url: /opendap/data/ncml/agg/aggExisting.ncml.dmr (Caused by ResponseError('too many 500 error responses'))

What happens if we download a single data point?#

ds['T']

Note

The info about chunking in T implies the entire array is treated as a single chunk! This is a stardard interpretation that Xarray makes of OPeNDAP urls. What happens if I download a subset of the data?

# clear the cache to inspect what is being downloaded
session.cache.clear() 
%%time
ds['T'].isel(time=1, lon=0, lat=0).load()
print("====================================== \n Request sent to the Remote Server:\n ", session.cache.urls()[0].split("?")[-1].split("&dap4.checksum")[0].replace("%5B","[").replace("%5D","]").replace("%3A",":").replace("%2F","/"), "\n====================================== ")

The constraint expression is built from the .isel Xarray method and correctly passed to the server, which does all the subsetting work!

Case 2) Subsetting across two separate files.#

The two files can be found in the test server, named: coads_climatology and coads_climatology2. These two datasets share identical spatial dimensions, can be aggregated in time, and share almost all identical variables.

Note

It is important to always check that datasets can be aggregated. PyDAP and Xarray have internal logic to check if any two or more datasets can be concatenated. But all these safety checks only take into account dimensions and cooordinates.

An important step will be the use of Constraint Expressions (CEs) to ensure that only the variables of interest are concatenating.

Warning

One of these files has extra variables not present in the other file, and that we will discarded by the use of CEs.

urls = ["http://test.opendap.org/opendap/data/nc/coads_climatology.nc", "http://test.opendap.org/opendap/data/nc/coads_climatology2.nc"]
dap4_urls = [url.replace("http","dap4") for url in urls]

# constraint expression
dap4_CE = "?dap4.ce=" + ";".join(["/SST", "/COADSX", "/COADSY", "/TIME"])

# Final list of OPeNDAP URLs
dap4ce_urls =[url+dap4_CE for url in dap4_urls]
print("====================================================\nThe following are the DAP4 OPeNDAP URLs \n", dap4ce_urls)

Note

Q: Why use CEs when Xarray has a .drop_variables method? Because Xarray needs to first parse ALL of the metadata associated with the remote file first, to subsequently drop the variables. In some remote files, there could be 1000s of variables. Xarray would then parse all these, and them drop them. With the CE, the server sends a Constrained Metadata associated with only the desired variables.

Warning

Xarray expects the presence of dimension in the metadata. When constructing CEs, the user needs to make sure to include all the dimensions associated with the variables of interest in the CE. In the example above, COASX, COADSY, and TIME are the dimensions of SST.

Consolidate Metadata speeds up the Dataset generation.#

consolidate_metadata(dap4ce_urls, session=session, concat_dim="TIME")

Note

consolidate_metadata(dap4_urls, concat_dim='...', session=session) downloads the dimensions of the remote file and stores them as a SQLite, to be reused. The session object becomes a way to authenticate, and act as a database manager! This practice can result in a performance gain of ~ 10-100 times faster workflows!

Use Xarray logic to download data.#

ds = xr.open_mfdataset(
    dap4ce_urls, 
    engine='pydap',
    concat_dim='TIME',
    session=session,
    combine="nested",
    parallel=True,
    decode_times=False,
)
ds
ds['SST']

Note

The chunking of SST implies the entire array within each file is a single chunk! This is a stardard interpretation that Xarray makes of OPeNDAP urls. What if we download a single spatial point from a single remote file?

session.cache.clear()
%%time
ds['SST'].isel(TIME=0, COADSX=0, COADSY=0).load() # this should download a single point one of the files
print("====================================== \n Request sent to the Remote Server:\n ", session.cache.urls()[0].split("?")[-1].split("&dap4.checksum")[0].replace("%5B","[").replace("%5D","]").replace("%3A",":").replace("%2F","/"), "\n====================================== ")

The entire variable is unnecessarily downloaded !!#

Ideally we would want the see the following Request (in the constraint expressssion) sent to the Remote Server:

dap4.ce=/SST[0:1:0][0:1:0][0:1:0]

It seems that xr.open_mfdataset does not pass the slice argument to the server for each remote dataset. Instead it downloads all the chunk (i.e. the data array) in a single request, subsets it, and then aggregates the data.

How to pass the slice from Xarray to the Remote Server#

The answer is to chunk the dataset when creating it. The chunk should match the expected size of your subset. That way the subset will be processed within a single request per remote file.

Warning

If you chunk the dataset with a size smaller that your expected download, you will trigger many downloads per remote file, forcing Xarray extra work to assemble the data together.

# consolidate metadata again, since the cached metadata was cleared before
consolidate_metadata(dap4ce_urls, session=session, concat_dim="TIME")
# For a single element in all dimensions, the expected size of the download is:
expected_sizes = {"TIME":1, "COADSX":1, "COADSY":1}
%%time
ds = xr.open_mfdataset(
    dap4ce_urls, 
    engine='pydap',
    concat_dim='TIME',
    session=session,
    combine="nested",
    parallel=True,
    decode_times=False,
    chunks=expected_sizes,
)
session.cache.clear()
ds['SST'] # inspect chunks before download
%%time
ds['SST'].isel(TIME=0, COADSX=0, COADSY=0).load() # triggers download of an individual chunk
print("====================================== \n Request sent to the Remote Server:\n ", session.cache.urls()[0].split("?")[-1].split("&dap4.checksum")[0].replace("%5B","[").replace("%5D","]").replace("%3A",":").replace("%2F","/"), "\n====================================== ")

Warning: Be cautious about chunking#

We now only downloaded exactly what we requested! However, in some scenarios the time for download can be 10x slower, compared to the case when we requested more data!! The reason for the slowdown can sometimes be attributed to the number of chunks the dask graph generated.

  • No chunking. Download all the array in the file. 2 chunks in 5 dask graphs (one per file).

  • Chunking. Download only the desired element of a file. 388800 chunks in 5 dask graphs.

Ideally, the chunk manager should only trigger the download of a single chunk. However, 388800 were created to ensure passing the slice to the server. This, can sometimes lead to slowdowns on the client side.

In the scenario above, we went to the extremes. It is better to find a chunk compromise. We demonstrate that below, but now subsetting across all time (across both files).

consolidate_metadata(dap4ce_urls, session=session, concat_dim="TIME")
download_sizes = {"COADSY":1} # note that we will subset across all time
%%time
ds = xr.open_mfdataset(
    dap4ce_urls, 
    engine='pydap',
    concat_dim='TIME',
    session=session,
    combine="nested",
    parallel=True,
    decode_times=False,
    chunks=download_sizes,
)
session.cache.clear()
ds['SST']
%%time
ds['SST'].isel(COADSX=0, COADSY=0).load()
print("====================================== \n Parallel Requests sent to the Remote Server:\n ", [url.split("?")[-1].split("&dap4.checksum")[0].replace("%5B","[").replace("%5D","]").replace("%3A",":").replace("%2F","/") for url in session.cache.urls()], "\n====================================== ")

Success! Similar timings but much and smaller download!#