Remote Flood Mapping#

Besides processing of a flood’s extent on your own machine one can also offload the processing graph to the EODC Dask Gateway.

This notebook is based on the tutorial to connect to EODC Dask available at this link.

More information about EODC Dask Gateway can be found here.

Connect to EODC Dask#

Autentication is required through an username and password that should be requested to EODC.

from eodc.dask import EODCDaskGateway
from rich.console import Console
from rich.prompt import Prompt

console = Console()
your_username = Prompt.ask(prompt="Enter your Username")
gateway = EODCDaskGateway(username=your_username)
Enter your Username: 
---------------------------------------------------------------------------
StdinNotImplementedError                  Traceback (most recent call last)
Cell In[1], line 6
      3 from rich.prompt import Prompt
      5 console = Console()
----> 6 your_username = Prompt.ask(prompt="Enter your Username")
      7 gateway = EODCDaskGateway(username=your_username)

File /opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/rich/prompt.py:149, in PromptBase.ask(cls, prompt, console, password, choices, case_sensitive, show_default, show_choices, default, stream)
    125 """Shortcut to construct and run a prompt loop and return the result.
    126 
    127 Example:
   (...)
    138     stream (TextIO, optional): Optional text file open for reading to get input. Defaults to None.
    139 """
    140 _prompt = cls(
    141     prompt,
    142     console=console,
   (...)
    147     show_choices=show_choices,
    148 )
--> 149 return _prompt(default=default, stream=stream)

File /opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/rich/prompt.py:292, in PromptBase.__call__(self, default, stream)
    290 self.pre_prompt()
    291 prompt = self.make_prompt(default)
--> 292 value = self.get_input(self.console, prompt, self.password, stream=stream)
    293 if value == "" and default != ...:
    294     return default

File /opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/rich/prompt.py:211, in PromptBase.get_input(cls, console, prompt, password, stream)
    193 @classmethod
    194 def get_input(
    195     cls,
   (...)
    199     stream: Optional[TextIO] = None,
    200 ) -> str:
    201     """Get input from user.
    202 
    203     Args:
   (...)
    209         str: String from user.
    210     """
--> 211     return console.input(prompt, password=password, stream=stream)

File /opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/rich/console.py:2151, in Console.input(self, prompt, markup, emoji, password, stream)
   2149         result = stream.readline()
   2150     else:
-> 2151         result = input()
   2152 return result

File /opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/ipykernel/kernelbase.py:1281, in Kernel.raw_input(self, prompt)
   1279 if not self._allow_stdin:
   1280     msg = "raw_input was called, but this frontend does not support input requests."
-> 1281     raise StdinNotImplementedError(msg)
   1282 return self._input_request(
   1283     str(prompt),
   1284     self._parent_ident["shell"],
   1285     self.get_parent("shell"),
   1286     password=False,
   1287 )

StdinNotImplementedError: raw_input was called, but this frontend does not support input requests.

Cluster Configuration#

It is possible to change the default configuration to optimize your specifications at the cell below.

cluster_options = gateway.cluster_options()
cluster_options

Create Dask Cluster#

Initiate a client#

In order to create a Dask cluster, it is first necessary to initiate the client.

Important: Per default no worker is spawned, therefore please use the widget to add/scale Dask workers in other to enable compuations on the cluster.
cluster = gateway.new_cluster(cluster_options)
client = cluster.get_client()
cluster

Another way to spawn workers is trhough the cell below that sets two workers as starting point and enables up to 5 workers, if necessary.

cluster.adapt(minimum=2, maximum=5)

List available clusters#

console.print(gateway.list_clusters())

Connect to already running clusters#

Instead of restarting the whole process again, you can directly connect to cluster already running.

cluster = gateway.connect(gateway.list_clusters()[0].name)
console.print(cluster)

Dask dashboard#

The following link displays the Dask Dashboard that can be used to monitor the execution of computations.

cluster.dashboard_link

Remote Dask Flood Mapping#

Uisng the same example from the local dask setup, the event chosen is storm Babet which hit the Danish and Northern coast of Germany at the 20th of October 2023 Wikipedia. We target an area around Zingst at the Baltic coast of Northern Germany.

from dask_flood_mapper import flood
time_range = "2022-10-11/2022-10-25"
bbox = [12.3, 54.3, 13.1, 54.6]
fd = flood.probability(bbox=bbox, datetime=time_range).compute()
fd
import hvplot.xarray


fd.hvplot.image(
    x="x",
    y="y",
    rasterize=True,
    geo=True,
    tiles=True,
    project=True,
    frame_height=400,
)

Make sure to shutdown the cluster.

cluster.close(shutdown=True)