1  Dask Flood Mapper

import pystac_client
from odc import stac as odc_stac
import xarray as xr
import rioxarray
import numpy as np
import hvplot.xarray

Dask makes parallel computing easy by providing a familiar API common libraries, such as Pandas and Numpy. This allow efficient scaling of the here presented workflow for this adaptation of the TU Wien Bayesian flood mapping algorithm. The data size will be a main limiting factor as the data grows larger than RAM. For this reason we will partition our data in chunks which will presented to the machine workers by Dasks task scheduler in a most efficient manner. Although many of Dask’ settings can be handled automatically, we can also modify some parameters for optimal performance of the workflow for the desired processing environment. So, note, that this highly depends on your own machine’s specifications.

I will, furthermore, set the temporary directory for when Dask spills data from the workers memory to disk.

import dask
dask.config.set(temporary_directory='/tmp')
<dask.config.set at 0x7fba7d43d9c0>

We can then set the Dask Client, where we avoid inter-worker communication which is common for working with numpy and xarray in this case.

from dask.distributed import Client, wait
client = Client(processes=False, threads_per_worker=2,
                n_workers=3, memory_limit='28GB')
client

In conjunction with setting up the Dask client I will chunk my arrays along three dimensions according to the following specifications for maximum performance on my setup.

chunks = {'time':1, "latitude": 1300, "longitude": 1300}

1.1 Cube Definitions

The following generic specifications are used for presenting the data.

crs = "EPSG:4326" # Coordinate Reference System - World Geodetic System 1984 (WGS84) in this case 
res = 0.00018 # 20 meter in degree

1.2 Northern Germany Flood

Storm Babet hit the Denmark and Northern coast at the 20th of October 2023 Wikipedia. Here an area around Zingst at the Baltic coast of Northern Germany is selected as the study area.

time_range = "2022-10-11/2022-10-25"
minlon, maxlon = 12.3, 13.1
minlat, maxlat = 54.3, 54.6
bounding_box = [minlon, minlat, maxlon, maxlat]

1.3 EODC STAC Catalog

The pystac_client establishes a connection to the EODC STAC Catalog. This results in a catalog object that can be used to discover collections and items hosted at EODC.

eodc_catalog = pystac_client.Client.open("https://stac.eodc.eu/api/v1")

1.4 Microwave Backscatter Measurements

The basic premise of microwave-based backscattering can be seen in the sketch below, the characteristics of backscattering over land and water differ considerably. With this knowledge we can detect whenever a pixel with a predominant land like signature changes to a water like signature in the event of flooding.

Schematic backscattering over land and water. Image from Geological Survey Ireland

We discover Sentinel-1 microwave backscatter (\(\sigma_0\) [1]) at a 20 meter resolution, like so:

search = eodc_catalog.search(
    collections="SENTINEL1_SIG0_20M",
    bbox=bounding_box,
    datetime=time_range,
)

items_sig0 = search.item_collection()

The state of the orbit and relative orbit number is also saved, as the water and land likelihoods (which are calculated later on) dependent on the orbital configuration. These variables will be added as additional coordinates to the data cube. For this purpose a small helper function is defined, like so:

def extract_orbit_names(items):
    return np.array([items[i].properties["sat:orbit_state"][0].upper() + \
                     str(items[i].properties["sat:relative_orbit"]) \
                     for i in range(len(items))])

We will also save the scaling factor and nodata values of STAC items to correct the loaded data accordingly. Again a helper function will be used to correctly scale and fill no data values of the cube.

def post_process_eodc_cube(dc: xr.Dataset, items, bands):
    if not isinstance(bands, tuple):
        bands = tuple([bands])
    for i in bands:
        dc[i] = post_process_eodc_cube_(dc[i], items, i)
    return dc

def post_process_eodc_cube_(dc: xr.Dataset, items, band):
    scale = items[0].assets[band].extra_fields.get('raster:bands')[0]['scale']
    nodata = items[0].assets[band].extra_fields.get('raster:bands')[0]['nodata']
    return dc.where(dc != nodata) / scale

The VV polarization of the discover items can be loaded with odc-stac and cast in the desired projection and resolution. The data is at this point only lazily loaded, meaning that we only make an instance of the outlines of the datacube with the proper shape and resolution, but without actually loading all the data. This is done by providing the chunks as defined before, which partitions the data in portions which are more easily handled by the setup used for processing the data (in this case my own pc).

bands = "VV"
sig0_dc = odc_stac.load(items_sig0,
                        bands=bands,
                        crs=crs,
                        chunks=chunks,
                        resolution=res,
                        bbox=bounding_box,
                        groupby=None,
                        )

Now we can rescale our variable, fill the no data values with np.nan values, and add the orbit names, with the previous defined functions.

sig0_dc = post_process_eodc_cube(sig0_dc, items_sig0, bands).\
    rename_vars({ "VV": "sig0"}).\
    assign_coords(orbit=("time", extract_orbit_names(items_sig0))).\
    dropna(dim="time", how="all").\
    sortby("time")

Then we remove duplicate time dimensions from the data cube and extract the orbit names as we will need those for obtaining the correct harmonic parameters and local incidence angles,as explained in the next section. Also, note, that we call dask.persist to materialize the object but retain it as a delayed object in the workers memory.

__, indices = np.unique(sig0_dc.time, return_index=True)
indices.sort()
orbit_sig0 = sig0_dc.orbit[indices].data
sig0_dc = sig0_dc.groupby("time").mean(skipna=True)
sig0_dc = sig0_dc.assign_coords(orbit=("time", orbit_sig0))
sig0_dc = sig0_dc.persist()
wait(sig0_dc)
sig0_dc
<xarray.Dataset> Size: 237MB
Dimensions:      (time: 8, latitude: 1668, longitude: 4445)
Coordinates:
  * latitude     (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3
  * longitude    (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1
    spatial_ref  int32 4B 4326
  * time         (time) datetime64[ns] 64B 2022-10-11T05:25:01 ... 2022-10-23...
    orbit        (time) <U4 128B 'D168' 'A44' 'A44' ... 'A146' 'A146' 'D168'
Data variables:
    sig0         (time, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>

1.5 Harmonic Parameters

The so-called likelihoods of \(P(\sigma^0|flood)\) and \(P(\sigma^0|nonflood)\) can be calculated from past backscattering information. To be able to this we load the harmonic parameters we can model the expected variations in land back scattering based on seasonal changes in vegetation. The procedure is similar to the backscattering routine.

We discover items.

search = eodc_catalog.search(
    collections="SENTINEL1_HPAR",
    bbox=bounding_box
)

items_hpar = search.item_collection()

Load the data as a lazy object.

bands = ("C1", "C2", "C3", "M0", "S1", "S2", "S3", "STD")
hpar_dc = odc_stac.load(items_hpar,
                        bands=bands,
                        crs=crs,
                        chunks=chunks,
                        resolution=res,
                        bbox=bounding_box,
                        groupby=None,
                        )

hpar_dc = post_process_eodc_cube(hpar_dc, items_hpar, bands).\
    rename({"time": "orbit"})
hpar_dc["orbit"] = extract_orbit_names(items_hpar)
hpar_dc = hpar_dc.groupby("orbit").mean(skipna=True)

We expand the variables along the orbits of sigma naught to be able to calculate the correct land reference backscatter signatures.

hpar_dc = hpar_dc.sel(orbit = orbit_sig0)
hpar_dc = hpar_dc.persist()
wait(hpar_dc)
hpar_dc
<xarray.Dataset> Size: 2GB
Dimensions:      (orbit: 8, latitude: 1668, longitude: 4445)
Coordinates:
  * latitude     (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3
  * longitude    (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1
    spatial_ref  int32 4B 4326
  * orbit        (orbit) object 64B 'D168' 'A44' 'A44' ... 'A146' 'A146' 'D168'
Data variables:
    C1           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    C2           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    C3           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    M0           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    S1           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    S2           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    S3           (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    STD          (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
    • latitude
      PandasIndex
      PandasIndex(Index([54.600030000000004,           54.59985,           54.59967,
                       54.59949,           54.59931,           54.59913,
                       54.59895,           54.59877,           54.59859,
                       54.59841,
             ...
             54.301590000000004, 54.301410000000004, 54.301230000000004,
             54.301050000000004,           54.30087,           54.30069,
                       54.30051,           54.30033,           54.30015,
                       54.29997],
            dtype='float64', name='latitude', length=1668))
    • longitude
      PandasIndex
      PandasIndex(Index([12.300030000000001, 12.300210000000002, 12.300390000000002,
             12.300570000000002,           12.30075, 12.300930000000001,
             12.301110000000001, 12.301290000000002, 12.301470000000002,
             12.301650000000002,
             ...
                       13.09833, 13.098510000000001, 13.098690000000001,
             13.098870000000002, 13.099050000000002, 13.099230000000002,
                       13.09941, 13.099590000000001, 13.099770000000001,
             13.099950000000002],
            dtype='float64', name='longitude', length=4445))
    • orbit
      PandasIndex
      PandasIndex(Index(['D168', 'A44', 'A44', 'D66', 'D95', 'A146', 'A146', 'D168'], dtype='object', name='orbit'))
  • 1.6 Local Incidence Angles

    Local incidence angles of measured microwave backscattering is as well important for calculating reference backscatter values, but now for water bodies. The procedure is much the same as for the harmonic parameters.

    search = eodc_catalog.search(
        collections="SENTINEL1_MPLIA",
        bbox=bounding_box
    )
    
    items_plia = search.item_collection()

    Load the lazy object and preprocess.

    bands = "MPLIA"
    plia_dc = odc_stac.load(items_plia,
                            bands=bands,
                            crs=crs,
                            chunks=chunks,
                            resolution=res,
                            bbox=bounding_box,
                            groupby=None,
                            )
    
    plia_dc = post_process_eodc_cube(plia_dc, items_plia, bands).\
        rename({"time": "orbit"})
    plia_dc["orbit"] = extract_orbit_names(items_plia)
    plia_dc = plia_dc.groupby("orbit").mean(skipna=True)

    We expand the variables along the orbits of sigma naught to be able to calculate the correct land reference backscatter signatures.

    plia_dc = plia_dc.sel(orbit = orbit_sig0)
    plia_dc = plia_dc.persist()
    wait(plia_dc)
    plia_dc
    <xarray.Dataset> Size: 237MB
    Dimensions:      (orbit: 8, latitude: 1668, longitude: 4445)
    Coordinates:
      * latitude     (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3
      * longitude    (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1
        spatial_ref  int32 4B 4326
      * orbit        (orbit) object 64B 'D168' 'A44' 'A44' ... 'A146' 'A146' 'D168'
    Data variables:
        MPLIA        (orbit, latitude, longitude) float32 237MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>

    1.7 ESA World Cover from Terrascope

    For flood mapping we are only interested in microwave backscattering over what used to be land, as such, we need a way to mask water bodies. For this we use the ESA World Cover data from the following STAC catalog.

    import os
    os.environ['AWS_NO_SIGN_REQUEST'] = 'YES'
    wcover_catalog = pystac_client.Client.open('https://services.terrascope.be/stac/')

    Similarly, we discover the required items.

    search = wcover_catalog.search(
        collections= "urn:eop:VITO:ESA_WorldCover_10m_2021_AWS_V2",
        bbox=bounding_box
    )
    
    items_wcover = search.item_collection()

    Load the data lazily.

    wcover_dc = odc_stac.load(items_wcover,
                            crs=crs,
                            chunks=chunks,
                            resolution=res,
                            bbox=bounding_box,
                           ).\
        squeeze("time").\
        drop_vars("time").\
        rename_vars({"ESA_WORLDCOVER_10M_MAP": "wcover"})
    wcover_dc = wcover_dc.persist()
    wait(wcover_dc)
    wcover_dc
    <xarray.Dataset> Size: 30MB
    Dimensions:      (latitude: 1668, longitude: 4445)
    Coordinates:
      * latitude     (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3
      * longitude    (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1
        spatial_ref  int32 4B 4326
    Data variables:
        wcover       (latitude, longitude) float32 30MB dask.array<chunksize=(1300, 1300), meta=np.ndarray>

    1.8 Fuse cube

    Here we fuse the four data cubes together and filter for the values that have a HAND value of above zero. We can now drop the obit coordinates as well as time slices which contain no land backscattering data.

    flood_dc = xr.merge([sig0_dc, plia_dc, hpar_dc, wcover_dc])
    flood_dc = flood_dc.where(flood_dc.wcover != 80)
    flood_dc = flood_dc.\
        reset_index("orbit", drop=True).\
        rename({"orbit": "time"}).\
        dropna(dim="time", how="all", subset=["sig0"])
    flood_dc = flood_dc.persist()
    wait(flood_dc)
    flood_dc
    <xarray.Dataset> Size: 2GB
    Dimensions:      (time: 6, latitude: 1668, longitude: 4445)
    Coordinates:
      * latitude     (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3
      * longitude    (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1
        spatial_ref  int32 4B 4326
      * time         (time) datetime64[ns] 48B 2022-10-11T05:25:01 ... 2022-10-23...
    Data variables:
        sig0         (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        MPLIA        (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        C1           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        C2           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        C3           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        M0           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        S1           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        S2           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        S3           (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        STD          (time, latitude, longitude) float32 178MB dask.array<chunksize=(1, 1300, 1300), meta=np.ndarray>
        wcover       (latitude, longitude) float32 30MB dask.array<chunksize=(1300, 1300), meta=np.ndarray>
    • latitude
      PandasIndex
      PandasIndex(Index([54.600030000000004,           54.59985,           54.59967,
                       54.59949,           54.59931,           54.59913,
                       54.59895,           54.59877,           54.59859,
                       54.59841,
             ...
             54.301590000000004, 54.301410000000004, 54.301230000000004,
             54.301050000000004,           54.30087,           54.30069,
                       54.30051,           54.30033,           54.30015,
                       54.29997],
            dtype='float64', name='latitude', length=1668))
    • longitude
      PandasIndex
      PandasIndex(Index([12.300030000000001, 12.300210000000002, 12.300390000000002,
             12.300570000000002,           12.30075, 12.300930000000001,
             12.301110000000001, 12.301290000000002, 12.301470000000002,
             12.301650000000002,
             ...
                       13.09833, 13.098510000000001, 13.098690000000001,
             13.098870000000002, 13.099050000000002, 13.099230000000002,
                       13.09941, 13.099590000000001, 13.099770000000001,
             13.099950000000002],
            dtype='float64', name='longitude', length=4445))
    • time
      PandasIndex
      PandasIndex(DatetimeIndex(['2022-10-11 05:25:01', '2022-10-14 17:01:09',
                     '2022-10-16 05:33:14', '2022-10-18 05:16:52',
                     '2022-10-21 16:52:59', '2022-10-23 05:25:01'],
                    dtype='datetime64[ns]', name='time', freq=None))
  • 1.9 Likelihoods

    Now we are ready to calculate the likelihoods of micorwave backscattering given flooding (or non flooding).

    1.9.1 Water

    We start with water which is the simplest calculation, where the hard coded values are coefficients of a regression model fitted to global water backscattering values.

    def calc_water_likelihood(dc):
        return  dc.MPLIA * -0.394181 + -4.142015
    flood_dc["wbsc"] = calc_water_likelihood(flood_dc)

    1.9.2 Land

    For land backscattering we construct the harmonic model from the parameters as contained in the fused data cube. By doing so, we obtain a reference land backscattering value to which to compare our actual observed sigma naught values.

    def harmonic_expected_backscatter(dc):
        w = np.pi * 2 / 365
        
        t = dc.time.dt.dayofyear
        wt = w * t
        
        M0 = dc.M0
        S1 = dc.S1
        S2 = dc.S2
        S3 = dc.S3
        C1 = dc.C1
        C2 = dc.C2
        C3 = dc.C3
        hm_c1 = (M0 + S1 * np.sin(wt)) + (C1 * np.cos(wt))
        hm_c2 = ((hm_c1 + S2 * np.sin(2 * wt)) + C2 * np.cos(2 * wt))
        hm_c3 = ((hm_c2 + S3 * np.sin(3 * wt)) + C3 * np.cos(3 * wt))
        return hm_c3
    flood_dc["hbsc"] = harmonic_expected_backscatter(flood_dc)

    1.10 Flood mapping

    Having calculated the likelihoods, we can now move on to calculate the probability of (non-)flooding given a pixel’s \(\sigma^0\). These so-called posteriors need one more piece of information, as can be seen in the equation above. We need the probability that a pixel is flooded \(P(F)\) or not flooded \(P(NF)\). Of course, these are the figures we’ve been trying to find this whole time. We don’t actually have them yet, so what can we do? In Bayesian statistics, we can just start with our best guess. These guesses are called our “priors”, because they are the beliefs we hold prior to looking at the data. This subjective prior belief is the foundation Bayesian statistics, and we use the likelihoods we just calculated to update our belief in this particular hypothesis. This updated belief is called the “posterior”.

    Let’s say that our best estimate for the chance of flooding versus non-flooding of a pixel is 50-50: a coin flip. We now can also calculate the probability of backscattering \(P(\sigma^0)\), as the weighted average of the water and land likelihoods, ensuring that our posteriors range between 0 to 1.

    The following code block shows how we calculate the priors which allow use to predict whether it is likely if a land pixel became flooded.

    def bayesian_flood_decision(dc):
        
        nf_std = 2.754041
        sig0 = dc.sig0
        std = dc.STD
        wbsc = dc.wbsc
        hbsc = dc.hbsc
    
        f_prob = (1.0 / (std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * \
            (((sig0 - wbsc) / nf_std) ** 2))
        nf_prob = (1.0 / (nf_std * np.sqrt(2 * np.pi))) * np.exp(-0.5 * \
            (((sig0 - hbsc) / nf_std) ** 2))
        
        evidence = (nf_prob * 0.5) + (f_prob * 0.5)
        nf_post_prob = (nf_prob * 0.5) / evidence
        f_post_prob = (f_prob * 0.5) / evidence
        decision = xr.where(np.isnan(f_post_prob) | np.isnan(nf_post_prob), np.nan, np.greater(f_post_prob, nf_post_prob))
        return nf_post_prob, f_post_prob, decision
    flood_dc[["nf_post_prob", "f_post_prob", "decision"]] = bayesian_flood_decision(flood_dc)

    1.11 Postprocessing

    We continue by improving our flood map by filtering out observations that we expect to have low sensitivity to flooding based on a predefined set of criteria.

    These criteria include: * Masking of Exceeding Incidence Angles * Identification of Conflicting Distributions * Removal of Measurement Outliers * Denial of High Uncertainty on Decision

    def post_processing(dc):
        dc = dc * np.logical_and(dc.MPLIA >= 27, dc.MPLIA <= 48)
        dc = dc * (dc.hbsc > (dc.wbsc + 0.5 * 2.754041))
        land_bsc_lower = dc.hbsc - 3 * dc.STD
        land_bsc_upper = dc.hbsc + 3 * dc.STD
        water_bsc_upper = dc.wbsc + 3 * 2.754041
        mask_land_outliers = np.logical_and(dc.sig0 > land_bsc_lower, dc.sig0 < land_bsc_upper)
        mask_water_outliers = dc.sig0 < water_bsc_upper
        dc = dc * (mask_land_outliers | mask_water_outliers)
        return  (dc * (dc.f_post_prob > 0.8)).decision
    flood_output = post_processing(flood_dc)

    1.12 Removal of Speckles

    The following step is designed to further improve the clarity of the floodmaps. These filters do not directly relate to prior knowledge on backscattering, but consists of contextual evidence that supports, or oppose, a flood classification. This mainly targets so-called speckles. These speckles are areas of one or a few pixels, and which are likely the result of the diversity of scattering surfaces at a sub-pixel level. In this approach it is argued that small, solitary flood surfaces are unlikely. Hence speckles are removed by applying a smoothing filter which consists of a rolling window median along the x and y-axis simultaneously.

    flood_output = flood_output.rolling({"longitude": 5, "latitude": 5}, center=True).median(skipna=True).persist()
    wait(flood_output)
    flood_output
    <xarray.DataArray 'decision' (time: 6, latitude: 1668, longitude: 4445)> Size: 356MB
    dask.array<where, shape=(6, 1668, 4445), dtype=float64, chunksize=(1, 1302, 1302), chunktype=numpy.ndarray>
    Coordinates:
      * latitude     (latitude) float64 13kB 54.6 54.6 54.6 54.6 ... 54.3 54.3 54.3
      * longitude    (longitude) float64 36kB 12.3 12.3 12.3 12.3 ... 13.1 13.1 13.1
        spatial_ref  int32 4B 4326
      * time         (time) datetime64[ns] 48B 2022-10-11T05:25:01 ... 2022-10-23...

    1.13 Results

    We are now ready to visualize our results.

    flood_output.plot(col="time", col_wrap=3)

    In the following graphic we superimpose the data on a map and we can move the slider to see which areas become flooded over time.

    flood_output.hvplot.quadmesh(x='longitude', y='latitude', geo=True, widget_location='bottom', rasterize=True, \
                                project=True, clim=(0,1), cmap=["rgba(0, 0, 1, 0.1)","darkred"], tiles=True, \
                                clabel="        non-flood                                        flood")