import pystac_client
from odc import stac as odc_stac
import xarray as xr
import rioxarray
import numpy as np
import hvplot.xarray
Dask makes parallel computing easy by providing a familiar API common libraries, such as Pandas and Numpy. This allow efficient scaling of the here presented workflow for this adaptation of the TU Wien Bayesian flood mapping algorithm. The data size will be a main limiting factor as the data grows larger than RAM. For this reason we will partition our data in chunks which will presented to the machine workers by Dasks task scheduler in a most efficient manner. Although many of Dask’ settings can be handled automatically, we can also modify some parameters for optimal performance of the workflow for the desired processing environment. So, note, that this highly depends on your own machine’s specifications.
I will, furthermore, set the temporary directory for when Dask spills data from the workers memory to disk.
import dask
set(temporary_directory='/tmp') dask.config.
<dask.config.set at 0x7fba7d43d9c0>
We can then set the Dask Client, where we avoid inter-worker communication which is common for working with numpy
and xarray
in this case.
from dask.distributed import Client, wait
= Client(processes=False, threads_per_worker=2,
client =3, memory_limit='28GB')
n_workers client
In conjunction with setting up the Dask client I will chunk my arrays along three dimensions according to the following specifications for maximum performance on my setup.
= {'time':1, "latitude": 1300, "longitude": 1300} chunks
1.1 Cube Definitions
The following generic specifications are used for presenting the data.
= "EPSG:4326" # Coordinate Reference System - World Geodetic System 1984 (WGS84) in this case
crs = 0.00018 # 20 meter in degree res
1.2 Northern Germany Flood
Storm Babet hit the Denmark and Northern coast at the 20th of October 2023 Wikipedia. Here an area around Zingst at the Baltic coast of Northern Germany is selected as the study area.
= "2022-10-11/2022-10-25"
time_range = 12.3, 13.1
minlon, maxlon = 54.3, 54.6
minlat, maxlat = [minlon, minlat, maxlon, maxlat] bounding_box
1.3 EODC STAC Catalog
The pystac_client
establishes a connection to the EODC STAC Catalog. This results in a catalog object that can be used to discover collections and items hosted at EODC.
= pystac_client.Client.open("https://stac.eodc.eu/api/v1") eodc_catalog
1.4 Microwave Backscatter Measurements
The basic premise of microwave-based backscattering can be seen in the sketch below, the characteristics of backscattering over land and water differ considerably. With this knowledge we can detect whenever a pixel with a predominant land like signature changes to a water like signature in the event of flooding.
Schematic backscattering over land and water. Image from Geological Survey Ireland
We discover Sentinel-1 microwave backscatter (\(\sigma_0\) [1]) at a 20 meter resolution, like so:
= eodc_catalog.search(
search ="SENTINEL1_SIG0_20M",
collections=bounding_box,
bbox=time_range,
datetime
)
= search.item_collection() items_sig0
The state of the orbit and relative orbit number is also saved, as the water and land likelihoods (which are calculated later on) dependent on the orbital configuration. These variables will be added as additional coordinates to the data cube. For this purpose a small helper function is defined, like so:
def extract_orbit_names(items):
return np.array([items[i].properties["sat:orbit_state"][0].upper() + \
str(items[i].properties["sat:relative_orbit"]) \
for i in range(len(items))])
We will also save the scaling factor and nodata values of STAC items to correct the loaded data accordingly. Again a helper function will be used to correctly scale and fill no data values of the cube.
def post_process_eodc_cube(dc: xr.Dataset, items, bands):
if not isinstance(bands, tuple):
= tuple([bands])
bands for i in bands:
= post_process_eodc_cube_(dc[i], items, i)
dc[i] return dc
def post_process_eodc_cube_(dc: xr.Dataset, items, band):
= items[0].assets[band].extra_fields.get('raster:bands')[0]['scale']
scale = items[0].assets[band].extra_fields.get('raster:bands')[0]['nodata']
nodata return dc.where(dc != nodata) / scale
The VV polarization of the discover items can be loaded with odc-stac
and cast in the desired projection and resolution. The data is at this point only lazily loaded, meaning that we only make an instance of the outlines of the datacube with the proper shape and resolution, but without actually loading all the data. This is done by providing the chunks as defined before, which partitions the data in portions which are more easily handled by the setup used for processing the data (in this case my own pc).
= "VV"
bands = odc_stac.load(items_sig0,
sig0_dc =bands,
bands=crs,
crs=chunks,
chunks=res,
resolution=bounding_box,
bbox=None,
groupby )
Now we can rescale our variable, fill the no data values with np.nan
values, and add the orbit names, with the previous defined functions.
= post_process_eodc_cube(sig0_dc, items_sig0, bands).\
sig0_dc "VV": "sig0"}).\
rename_vars({ =("time", extract_orbit_names(items_sig0))).\
assign_coords(orbit="time", how="all").\
dropna(dim"time") sortby(
Then we remove duplicate time dimensions from the data cube and extract the orbit names as we will need those for obtaining the correct harmonic parameters and local incidence angles,as explained in the next section. Also, note, that we call dask.persist
to materialize the object but retain it as a delayed object in the workers memory.
= np.unique(sig0_dc.time, return_index=True)
__, indices
indices.sort()= sig0_dc.orbit[indices].data
orbit_sig0 = sig0_dc.groupby("time").mean(skipna=True)
sig0_dc = sig0_dc.assign_coords(orbit=("time", orbit_sig0))
sig0_dc = sig0_dc.persist()
sig0_dc
wait(sig0_dc) sig0_dc
<xarray.Dataset> Size: 237MB Dimensions: (time: 8, latitude: 1668, longitude: 4445) Coordinates: * latitude (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3 * longitude (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1 spatial_ref int32 4B 4326 * time (time) datetime64[ns] 64B 2022-10-11T05:25:01 ... 2022-10-23... orbit (time) <U4 128B 'D168' 'A44' 'A44' ... 'A146' 'A146' 'D168' Data variables: sig0 (time, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
1.5 Harmonic Parameters
The so-called likelihoods of \(P(\sigma^0|flood)\) and \(P(\sigma^0|nonflood)\) can be calculated from past backscattering information. To be able to this we load the harmonic parameters we can model the expected variations in land back scattering based on seasonal changes in vegetation. The procedure is similar to the backscattering routine.
We discover items.
= eodc_catalog.search(
search ="SENTINEL1_HPAR",
collections=bounding_box
bbox
)
= search.item_collection() items_hpar
Load the data as a lazy object.
= ("C1", "C2", "C3", "M0", "S1", "S2", "S3", "STD")
bands = odc_stac.load(items_hpar,
hpar_dc =bands,
bands=crs,
crs=chunks,
chunks=res,
resolution=bounding_box,
bbox=None,
groupby
)
= post_process_eodc_cube(hpar_dc, items_hpar, bands).\
hpar_dc "time": "orbit"})
rename({"orbit"] = extract_orbit_names(items_hpar)
hpar_dc[= hpar_dc.groupby("orbit").mean(skipna=True) hpar_dc
We expand the variables along the orbits of sigma naught to be able to calculate the correct land reference backscatter signatures.
= hpar_dc.sel(orbit = orbit_sig0)
hpar_dc = hpar_dc.persist()
hpar_dc
wait(hpar_dc) hpar_dc
<xarray.Dataset> Size: 2GB Dimensions: (orbit: 8, latitude: 1668, longitude: 4445) Coordinates: * latitude (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3 * longitude (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1 spatial_ref int32 4B 4326 * orbit (orbit) object 64B 'D168' 'A44' 'A44' ... 'A146' 'A146' 'D168' Data variables: C1 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> C2 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> C3 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> M0 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> S1 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> S2 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> S3 (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray> STD (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
- latitudePandasIndex
PandasIndex(Index([54.600030000000004, 54.59985, 54.59967, 54.59949, 54.59931, 54.59913, 54.59895, 54.59877, 54.59859, 54.59841, ... 54.301590000000004, 54.301410000000004, 54.301230000000004, 54.301050000000004, 54.30087, 54.30069, 54.30051, 54.30033, 54.30015, 54.29997], dtype='float64', name='latitude', length=1668))
- longitudePandasIndex
PandasIndex(Index([12.300030000000001, 12.300210000000002, 12.300390000000002, 12.300570000000002, 12.30075, 12.300930000000001, 12.301110000000001, 12.301290000000002, 12.301470000000002, 12.301650000000002, ... 13.09833, 13.098510000000001, 13.098690000000001, 13.098870000000002, 13.099050000000002, 13.099230000000002, 13.09941, 13.099590000000001, 13.099770000000001, 13.099950000000002], dtype='float64', name='longitude', length=4445))
- orbitPandasIndex
PandasIndex(Index(['D168', 'A44', 'A44', 'D66', 'D95', 'A146', 'A146', 'D168'], dtype='object', name='orbit'))