Tuning your application

It is possible to load data at high bandwidth using katdal: rates over 2.5 GB/s have been seen when loading from a local disk. However, it requires an understanding of the storage layout and choice of an appropriate access pattern.

This chapter is aimed at loading MVF version 4 (MeerKAT) data, as older versions typically contain far less data. Some of the advice is generic but some of the methods described here will not work on older data sets.

Chunking

The most important thing to understand is that the data is split into chunks, each of which are stored as a file on disk or an object in an S3 store. Retrieving any element of a chunk causes the entire chunk to be retrieved. Thus, aligning accesses to whole chunks will give the best performance, as data is not discarded.

As an illustration, consider an application that has an outer loop over the baselines, and loads data for one baseline at a time. Chunks typically span all baselines, so each time one baseline is loaded, katdal will actually load the entire data set. If the application can be redesigned to fetch data for a small time range for all baselines it will perform much better.

When using MVFv4, katdal uses dask to manage the chunking. After opening a data set, you can determine the chunking for a particular array by examining its dataset member:

>>> d.vis.dataset
dask.array<1556179171-sdp, shape=(38, 4096, 40), dtype=complex64, chunksize=(32, 1024, 40)>
>>> d.vis.dataset.chunks
((32, 6), (1024, 1024, 1024, 1024), (40,))

For this data set, it will be optimal to load visibilities in 32 × 1024 × 40 element pieces.

Note that the chunking scheme may be different for visibilities, flags and weights.

Joint loading

The values returned by katdal are not the raw values stored in the chunks: there is processing involved, such as application of calibration solutions and flagging of missing data. Some of this processing is common between visibilities, flags and weights. It’s thus more efficient to load the visibilities, flags and weights as a single operation rather than as three separate operations.

This can be achieved using DaskLazyIndexer.get(). For example, replace

vis = d.vis[idx]
flags = d.flags[idx]
weights = d.weights[idx]

with

vis, flags, weights = DaskLazyIndexer.get([d.vis, d.flags, d.weights], idx)

Parallelism

Dask uses multiple worker threads. It defaults to one thread per CPU core, but for I/O-bound tasks this is often not enough to achieve maximum throughput. Refer to the dask scheduler documentation for details of how to configure the number of workers.

More workers only helps if there is enough parallel work to be performed, which means there need to be at least as many chunks loaded at a time as there are workers (and preferably many more). It’s thus advisable to load as much data at a time as possible without running out of memory.

Selection

Using DataSet.select() is relatively expensive. For the best performance, it should only be used occasionally (for example, to filter out unwanted data at the start), with array access notation or DaskLazyIndexer.get() used to break up large data sets into manageable pieces.

Dask also performs better with selections that select contiguous data. You might be able to get a little more performance by using DataSet.scans() (which will yield a series of contiguous selections) rather than using select() with scans='track'.

When using MVF v4 one can also pass a preselect parameter to katdal.open() which allows slicing a subset of the data (time and frequency). It is more limited than DataSet.select() (it can only select contiguous ranges, and can only specify the selection in terms of channels and dumps), but if a script is only interested in working on a subset of data, this method can be more efficient and uses less memory.

Network versus local disk

When loading data from the network, latency is typically higher, and so more workers will be needed to achieve peak throughput. Network access is also more sensitive to access patterns that are mis-aligned with chunks, because chunks are not cached in memory by the operation system and hence must be re-fetched over the network if they are accessed again.

Benchmarking

To assist with testing out the effects of changing these tuning parameters, the katdal source code includes a script called mvf_read_benchmark.py that allows a data set to be loaded in various ways and reports the average throughput. The command-line options are somewhat limited so you may need to edit it yourself, for example, to add a custom selection.