Skip to content

asf_tools v0.8.3 API Reference

Tools developed by ASF for working with SAR data

composite

Create a local-resolution-weighted composite from Sentinel-1 RTC products.

Create a local-resolution-weighted composite from a set of Sentinel-1 RTC products (D. Small, 2012). The local resolution, defined as the inverse of the local contributing (scattering) area, is used to weight each RTC products' contributions to the composite image on a pixel-by-pixel basis. The composite image is created as a Cloud Optimized GeoTIFF (COG). Additionally, a COG specifying the number of rasters contributing to each composite pixel is created.

References

David Small, 2012: https://doi.org/10.1109/IGARSS.2012.6350465

get_area_raster(raster)

Determine the path of the area raster for a given backscatter raster based on naming conventions for HyP3 RTC products

Parameters:

Name Type Description Default
raster str

path of the backscatter raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_VV.tif

required

Returns:

Name Type Description
area_raster str

path of the area raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_area.tif

Source code in asf_tools/composite.py
60
61
62
63
64
65
66
67
68
69
70
def get_area_raster(raster: str) -> str:
    """Determine the path of the area raster for a given backscatter raster based on naming conventions for HyP3 RTC
    products

    Args:
        raster: path of the backscatter raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_VV.tif

    Returns:
        area_raster: path of the area raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_area.tif
    """
    return '_'.join(raster.split('_')[:-1] + ['area.tif'])

get_full_extent(raster_info)

Determine the corner coordinates and geotransform for the full extent of a set of rasters

Parameters:

Name Type Description Default
raster_info dict

A dictionary of gdal.Info results for the set of rasters

required

Returns:

Name Type Description
upper_left

The upper left corner of the extent as a tuple

upper_right

The lower right corner of the extent as a tuple

geotransform

The geotransform of the extent as a list

Source code in asf_tools/composite.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get_full_extent(raster_info: dict):
    """Determine the corner coordinates and geotransform for the full extent of a set of rasters

    Args:
        raster_info: A dictionary of gdal.Info results for the set of rasters

    Returns:
        upper_left: The upper left corner of the extent as a tuple
        upper_right: The lower right corner of the extent as a tuple
        geotransform: The geotransform of the extent as a list
    """
    upper_left_corners = [info['cornerCoordinates']['upperLeft'] for info in raster_info.values()]
    lower_right_corners = [info['cornerCoordinates']['lowerRight'] for info in raster_info.values()]

    ulx = min([ul[0] for ul in upper_left_corners])
    uly = max([ul[1] for ul in upper_left_corners])
    lrx = max([lr[0] for lr in lower_right_corners])
    lry = min([lr[1] for lr in lower_right_corners])

    log.debug(f'Full extent raster upper left: ({ulx, uly}); lower right: ({lrx, lry})')

    trans = []
    for info in raster_info.values():
        # Only need info from any one raster
        trans = info['geoTransform']
        break

    trans[0] = ulx
    trans[3] = uly

    return (ulx, uly), (lrx, lry), trans

get_target_epsg_code(codes)

Determine the target UTM EPSG projection for the output composite

Parameters:

Name Type Description Default
codes list[int]

List of UTM EPSG codes

required

Returns:

Name Type Description
target int

UTM EPSG code

Source code in asf_tools/composite.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def get_target_epsg_code(codes: list[int]) -> int:
    """Determine the target UTM EPSG projection for the output composite

    Args:
        codes: List of UTM EPSG codes

    Returns:
        target: UTM EPSG code
    """
    # use median east/west UTM zone of all files, regardless of hemisphere
    # UTM EPSG codes for each hemisphere will look like:
    #   North: 326XX
    #   South: 327XX
    valid_codes = list(range(32601, 32661)) + list(range(32701, 32761))
    if bad_codes := set(codes) - set(valid_codes):
        raise ValueError(f'Non UTM EPSG code encountered: {bad_codes}')

    hemispheres = [c // 100 * 100 for c in codes]
    # if even modes, choose lowest (North)
    target_hemisphere = min(multimode(hemispheres))

    zones = sorted([c % 100 for c in codes])
    # if even length, choose fist of median two
    target_zone = zones[(len(zones) - 1) // 2]

    return target_hemisphere + target_zone

make_composite(out_name, rasters, resolution=None)

Creates a local-resolution-weighted composite from Sentinel-1 RTC products

Parameters:

Name Type Description Default
out_name str

The base name of the output GeoTIFFs

required
rasters list[str]

A list of file paths of the images to composite

required
resolution float | None

The pixel size for the output GeoTIFFs

None

Returns:

Name Type Description
out_raster

Path to the created composite backscatter GeoTIFF

out_counts_raster

Path to the created GeoTIFF with counts of scenes contributing to each pixel

Source code in asf_tools/composite.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def make_composite(out_name: str, rasters: list[str], resolution: float | None = None):
    """Creates a local-resolution-weighted composite from Sentinel-1 RTC products

    Args:
        out_name: The base name of the output GeoTIFFs
        rasters: A list of file paths of the images to composite
        resolution: The pixel size for the output GeoTIFFs

    Returns:
        out_raster: Path to the created composite backscatter GeoTIFF
        out_counts_raster: Path to the created GeoTIFF with counts of scenes contributing to each pixel
    """
    if not rasters:
        raise ValueError('Must specify at least one raster to composite')

    raster_info = {}
    for raster in rasters:
        raster_info[raster] = gdal.Info(raster, format='json')
        # make sure gdal can read the area raster
        gdal.Info(get_area_raster(raster))

    target_epsg_code = get_target_epsg_code([get_epsg_code(info) for info in raster_info.values()])
    log.debug(f'Composite projection is EPSG:{target_epsg_code}')

    if resolution is None:
        resolution = max([info['geoTransform'][1] for info in raster_info.values()])
    log.debug(f'Composite resolution is {resolution} meters')

    # resample rasters to maximum resolution & common UTM zone
    with TemporaryDirectory(prefix='reprojected_') as temp_dir:
        raster_info = reproject_to_target(
            raster_info,
            target_epsg_code=target_epsg_code,
            target_resolution=resolution,
            directory=temp_dir,
        )

        # Get extent of union of all images
        full_ul, full_lr, full_trans = get_full_extent(raster_info)

        nx = int(abs(full_ul[0] - full_lr[0]) // resolution)
        ny = int(abs(full_ul[1] - full_lr[1]) // resolution)

        outputs = np.zeros((ny, nx))
        weights = np.zeros(outputs.shape)
        counts = np.zeros(outputs.shape, dtype=np.int8)

        for raster, info in raster_info.items():
            log.info(f'Processing raster {raster}')
            log.debug(
                f'Raster upper left: {info["cornerCoordinates"]["upperLeft"]}; '
                f'lower right: {info["cornerCoordinates"]["lowerRight"]}'
            )

            values = read_as_array(raster)

            area_raster = get_area_raster(raster)
            areas = read_as_array(area_raster)

            ulx, uly = info['cornerCoordinates']['upperLeft']
            y_index_start = int((full_ul[1] - uly) // resolution)
            y_index_end = y_index_start + values.shape[0]

            x_index_start = int((ulx - full_ul[0]) // resolution)
            x_index_end = x_index_start + values.shape[1]

            log.debug(
                f'Placing values in output grid at {y_index_start}:{y_index_end} and {x_index_start}:{x_index_end}'
            )

            mask = values == 0
            raster_weights = 1.0 / areas
            raster_weights[mask] = 0

            outputs[y_index_start:y_index_end, x_index_start:x_index_end] += values * raster_weights
            weights[y_index_start:y_index_end, x_index_start:x_index_end] += raster_weights
            counts[y_index_start:y_index_end, x_index_start:x_index_end] += ~mask

    del values, areas, mask, raster_weights

    # Divide by the total weight applied
    outputs /= weights
    del weights

    out_raster = write_cog(f'{out_name}.tif', outputs, full_trans, target_epsg_code, nodata_value=0)
    del outputs

    out_counts_raster = write_cog(
        f'{out_name}_counts.tif',
        counts,
        full_trans,
        target_epsg_code,
        dtype=gdal.GDT_Int16,
    )
    del counts

    return out_raster, out_counts_raster

reproject_to_target(raster_info, target_epsg_code, target_resolution, directory)

Reprojects a set of raster images to a common projection and resolution

Parameters:

Name Type Description Default
raster_info dict

A dictionary of gdal.Info results for the set of rasters

required
target_epsg_code int

The integer EPSG code for the target projection

required
target_resolution float

The target resolution

required
directory str

The directory in which to create the reprojected files

required

Returns:

Name Type Description
target_raster_info dict

An updated dictionary of gdal.Info results for the reprojected files

Source code in asf_tools/composite.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def reproject_to_target(raster_info: dict, target_epsg_code: int, target_resolution: float, directory: str) -> dict:
    """Reprojects a set of raster images to a common projection and resolution

    Args:
        raster_info: A dictionary of gdal.Info results for the set of rasters
        target_epsg_code: The integer EPSG code for the target projection
        target_resolution: The target resolution
        directory: The directory in which to create the reprojected files

    Returns:
        target_raster_info: An updated dictionary of gdal.Info results for the reprojected files
    """
    target_raster_info = {}
    for raster, info in raster_info.items():
        epsg_code = get_epsg_code(info)
        resolution = info['geoTransform'][1]
        if epsg_code != target_epsg_code or resolution != target_resolution:
            log.info(f'Reprojecting {raster}')
            reprojected_raster = os.path.join(directory, os.path.basename(raster))
            gdal.Warp(
                reprojected_raster,
                raster,
                dstSRS=f'EPSG:{target_epsg_code}',
                xRes=target_resolution,
                yRes=target_resolution,
                targetAlignedPixels=True,
            )

            area_raster = get_area_raster(raster)
            log.info(f'Reprojecting {area_raster}')
            reprojected_area_raster = os.path.join(directory, os.path.basename(area_raster))
            gdal.Warp(
                reprojected_area_raster,
                area_raster,
                dstSRS=f'EPSG:{target_epsg_code}',
                xRes=target_resolution,
                yRes=target_resolution,
                targetAlignedPixels=True,
            )

            target_raster_info[reprojected_raster] = gdal.Info(reprojected_raster, format='json')
        else:
            log.info(f'No need to reproject {raster}')
            target_raster_info[raster] = info

    return target_raster_info

dem

Prepare a Copernicus GLO-30 DEM virtual raster (VRT) covering a given geometry

prepare_dem_vrt(vrt, geometry)

Create a DEM mosaic VRT covering a given geometry

The DEM mosaic is assembled from the Copernicus GLO-30 DEM tiles that intersect the geometry.

Note: asf_tools does not currently support geometries that cross the antimeridian.

Parameters:

Name Type Description Default
vrt str | Path

Path for the output VRT file

required
geometry Geometry | BaseGeometry

Geometry in EPSG:4326 (lon/lat) projection for which to prepare a DEM mosaic

required
Source code in asf_tools/dem.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def prepare_dem_vrt(vrt: str | Path, geometry: ogr.Geometry | BaseGeometry):
    """Create a DEM mosaic VRT covering a given geometry

    The DEM mosaic is assembled from the Copernicus GLO-30 DEM tiles that intersect the geometry.

    Note: `asf_tools` does not currently support geometries that cross the antimeridian.

    Args:
        vrt: Path for the output VRT file
        geometry: Geometry in EPSG:4326 (lon/lat) projection for which to prepare a DEM mosaic

    """
    with GDALConfigManager(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR'):
        if isinstance(geometry, BaseGeometry):
            geometry = ogr.CreateGeometryFromWkb(geometry.wkb)

        min_lon, max_lon, _, _ = geometry.GetEnvelope()
        if min_lon < -160.0 and max_lon > 160.0:
            raise ValueError(f'asf_tools does not currently support geometries that cross the antimeridian: {geometry}')

        tile_features = vector.get_features(DEM_GEOJSON)
        if not vector.get_property_values_for_intersecting_features(geometry, tile_features):
            raise ValueError(f'Copernicus GLO-30 DEM does not intersect this geometry: {geometry}')

        dem_file_paths = vector.intersecting_feature_properties(geometry, tile_features, 'file_path')

        gdal.BuildVRT(str(vrt), dem_file_paths)

hydrosar

flood_map

Generate flood depth map from surface water extent map.

Create a flood depth map from a surface water extent map and a HAND image. The HAND image must be pixel-aligned (same extent and size) to the water extent map, and the surface water extent map should be a byte GeoTIFF indicating water (true), not water (false). Flood depth maps are estimated using either a numerical, normalized median absolute deviation, logarithmic or iterative approach.

logstat(data, func=np.nanstd)

Calculate a function in logarithmic scale and return in linear scale. INF values inside the data array are set to nan.

Parameters:

Name Type Description Default
data ndarray

array of data

required
func Callable

statistical function to calculate in logarithmic scale

nanstd

Returns: statistic: statistic of data in linear scale

Source code in asf_tools/hydrosar/flood_map.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def logstat(data: np.ndarray, func: Callable = np.nanstd) -> np.ndarray | float:
    """Calculate a function in logarithmic scale and return in linear scale.
    INF values inside the data array are set to nan.

    Args:
        data: array of data
        func: statistical function to calculate in logarithmic scale
    Returns:
        statistic: statistic of data in linear scale
    """
    ld = np.log(data)
    ld[np.isinf(ld)] = np.nan
    st = func(ld)
    return np.exp(st)

make_flood_map(out_raster, vv_raster, water_raster, hand_raster, estimator='iterative', water_level_sigma=3.0, known_water_threshold=None, iterative_bounds=(0, 15), iterative_min_size=0, minimization_metric='ts')

Create a flood depth map from a surface water extent map.

WARNING: This functionality is still under active development and the products created using this function are likely to change in the future.

Create a flood depth map from a single surface water extent map and a HAND image. The HAND image must be pixel-aligned to the surface water extent map. The surface water extent map should be a byte GeoTIFF indicating water (true) and not water (false)

Known perennial Global Surface-water data are produced under the Copernicus Programme (Pekel et al., 2016), and are included with surface-water detection maps when generating the flood depth product.

Flood depth maps are estimated using one of the approaches: Iterative: (Default) Basin hopping optimization method matches flooded areas to flood depth estimates given by the HAND layer. This is the most accurate method but also the most time-intensive. Normalized Median Absolute Deviation (nmad): Uses a median operator to estimate the variation to increase robustness in the presence of outliers. Logstat: Calculates the mean and standard deviation of HAND heights in the logarithmic domain to improve robustness for very non-Gaussian data distributions. Numpy: Calculates statistics on a linear scale. Least robust to outliers and non-Gaussian distributions.

Parameters:

Name Type Description Default
out_raster str | Path

Flood depth GeoTIFF to create

required
vv_raster str | Path

Sentinel-1 RTC GeoTIFF, in power scale, with VV polarization

required
water_raster str | Path

Surface water extent GeoTIFF

required
hand_raster str | Path

Height Above Nearest Drainage (HAND) GeoTIFF aligned to the surface water extent raster

required
estimator str

Estimation approach for determining flood depth

'iterative'
water_level_sigma float

Max water height used in logstat, nmad, and numpy estimations

3.0
known_water_threshold float | None

Threshold for extracting the known water area in percent. If None, the threshold is calculated.

None
iterative_bounds tuple[int, int]

Minimum and maximum bound on the flood depths calculated by the basin-hopping algorithm used in the iterative estimator

(0, 15)
iterative_min_size int

Minimum size of a connected waterbody in pixels for calculating flood depths with the iterative estimator. Waterbodies smaller than this wll be skipped.

0
minimization_metric str

Evaluation method to minimize when using the iterative estimator. Options include a Fowlkes-Mallows index (fmi) or a threat score (ts).

'ts'
References

Jean-Francios Pekel, Andrew Cottam, Noel Gorelik, Alan S. Belward. 2016. https://doi:10.1038/nature20584

Source code in asf_tools/hydrosar/flood_map.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def make_flood_map(
    out_raster: str | Path,
    vv_raster: str | Path,
    water_raster: str | Path,
    hand_raster: str | Path,
    estimator: str = 'iterative',
    water_level_sigma: float = 3.0,
    known_water_threshold: float | None = None,
    iterative_bounds: tuple[int, int] = (0, 15),
    iterative_min_size: int = 0,
    minimization_metric: str = 'ts',
):
    """Create a flood depth map from a surface water extent map.

    WARNING: This functionality is still under active development and the products
    created using this function are likely to change in the future.

    Create a flood depth map from a single surface water extent map and
    a HAND image. The HAND image must be pixel-aligned to the surface water extent map.
    The surface water extent map should be a byte GeoTIFF indicating water (true) and
    not water (false)

    Known perennial Global Surface-water data are produced under the Copernicus Programme (Pekel et al., 2016),
    and are included with surface-water detection maps when generating the flood depth product.

    Flood depth maps are estimated using one of the approaches:
    *Iterative: (Default) Basin hopping optimization method matches flooded areas to flood depth
    estimates given by the HAND layer. This is the most accurate method but also the
    most time-intensive.
    *Normalized Median Absolute Deviation (nmad): Uses a median operator to estimate
    the variation to increase robustness in the presence of outliers.
    *Logstat: Calculates the mean and standard deviation of HAND heights in the logarithmic
    domain to improve robustness for very non-Gaussian data distributions.
    *Numpy: Calculates statistics on a linear scale. Least robust to outliers and non-Gaussian
    distributions.

    Args:
        out_raster: Flood depth GeoTIFF to create
        vv_raster: Sentinel-1 RTC GeoTIFF, in power scale, with VV polarization
        water_raster: Surface water extent GeoTIFF
        hand_raster: Height Above Nearest Drainage (HAND) GeoTIFF aligned to the surface water extent raster
        estimator: Estimation approach for determining flood depth
        water_level_sigma: Max water height used in logstat, nmad, and numpy estimations
        known_water_threshold: Threshold for extracting the known water area in percent.
            If `None`, the threshold is calculated.
        iterative_bounds: Minimum and maximum bound on the flood depths calculated by the basin-hopping algorithm
            used in the iterative estimator
        iterative_min_size: Minimum size of a connected waterbody in pixels for calculating flood depths with the
            iterative estimator. Waterbodies smaller than this wll be skipped.
        minimization_metric: Evaluation method to minimize when using the iterative estimator.
            Options include a Fowlkes-Mallows index (fmi) or a threat score (ts).

    References:
        Jean-Francios Pekel, Andrew Cottam, Noel Gorelik, Alan S. Belward. 2016. <https://doi:10.1038/nature20584>
    """
    info = gdal.Info(str(water_raster), format='json')
    epsg = get_epsg_code(info)
    geotransform = info['geoTransform']
    hand_array = gdal.Open(str(hand_raster), gdal.GA_ReadOnly).ReadAsArray()

    log.info('Fetching perennial flood data.')
    known_water_mask = get_waterbody(info, threshold=known_water_threshold)
    write_cog(
        str(out_raster).replace('.tif', f'_{estimator}_PW.tif'),
        known_water_mask,
        transform=geotransform,
        epsg_code=epsg,
        dtype=gdal.GDT_Byte,
        nodata_value=False,
    )

    water_map = gdal.Open(str(water_raster)).ReadAsArray()
    flood_mask = np.logical_or(water_map, known_water_mask)
    del water_map

    vv_array = read_as_masked_array(vv_raster)
    flood_mask[vv_array.mask] = False
    padding_mask = vv_array.mask
    del vv_array

    labeled_flood_mask, num_labels = ndimage.label(flood_mask)
    object_slices = ndimage.find_objects(labeled_flood_mask)
    log.info(f'Detected {num_labels} waterbodies...')
    if estimator.lower() == 'iterative':
        log.info(f'Skipping waterbodies less than {iterative_min_size} pixels.')

    flood_depth = np.zeros(flood_mask.shape)

    for ll in tqdm(range(1, num_labels)):  # Skip first, largest label.
        slices = object_slices[ll - 1]
        min0, max0 = slices[0].start, slices[0].stop
        min1, max1 = slices[1].start, slices[1].stop

        flood_window = labeled_flood_mask[min0:max0, min1:max1]
        hand_window = hand_array[min0:max0, min1:max1]

        water_height = estimate_flood_depth(
            ll,
            hand_window,
            flood_window,
            estimator=estimator,
            water_level_sigma=water_level_sigma,
            iterative_bounds=iterative_bounds,
            minimization_metric=minimization_metric,
            iterative_min_size=iterative_min_size,
        )

        flood_depth_window = flood_depth[min0:max0, min1:max1]
        flood_depth_window[flood_window == ll] = water_height - hand_window[flood_window == ll]

    flood_depth[flood_depth < 0] = 0

    nodata = -1
    flood_depth[padding_mask] = nodata

    floodmask_nodata = np.iinfo(np.uint8).max
    flood_mask_byte = flood_mask.astype(np.uint8)
    flood_mask_byte[padding_mask] = floodmask_nodata

    write_cog(
        str(out_raster).replace('.tif', f'_{estimator}_WaterDepth.tif'),
        flood_depth,
        transform=geotransform,
        epsg_code=epsg,
        dtype=gdal.GDT_Float64,
        nodata_value=nodata,
    )
    write_cog(
        str(out_raster).replace('.tif', f'_{estimator}_FloodMask.tif'),
        flood_mask_byte,
        transform=geotransform,
        epsg_code=epsg,
        dtype=gdal.GDT_Byte,
        nodata_value=floodmask_nodata,
    )

    flood_mask[known_water_mask] = False
    flood_depth[np.logical_not(flood_mask)] = 0
    flood_depth[padding_mask] = nodata
    write_cog(
        str(out_raster).replace('.tif', f'_{estimator}_FloodDepth.tif'),
        flood_depth,
        transform=geotransform,
        epsg_code=epsg,
        dtype=gdal.GDT_Float64,
        nodata_value=nodata,
    )

hand

calculate_hand_for_basins(out_raster, geometries, dem_file, acc_thresh=100)

Calculate the Height Above Nearest Drainage (HAND) for watershed boundaries (hydrobasins).

For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

Parameters:

Name Type Description Default
out_raster str | Path

HAND GeoTIFF to create

required
geometries GeometryCollection

watershed boundary (hydrobasin) polygons to calculate HAND over

required
dem_file str | Path

DEM raster covering (containing) geometries

required
acc_thresh int | None

Accumulation threshold for determining the drainage mask. If None, the mean accumulation value is used

100
Source code in asf_tools/hydrosar/hand/calculate.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def calculate_hand_for_basins(
    out_raster: str | Path,
    geometries: GeometryCollection,
    dem_file: str | Path,
    acc_thresh: int | None = 100,
):
    """Calculate the Height Above Nearest Drainage (HAND) for watershed boundaries (hydrobasins).

    For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

    Args:
        out_raster: HAND GeoTIFF to create
        geometries: watershed boundary (hydrobasin) polygons to calculate HAND over
        dem_file: DEM raster covering (containing) `geometries`
        acc_thresh: Accumulation threshold for determining the drainage mask.
            If `None`, the mean accumulation value is used
    """
    with rasterio.open(dem_file) as src:
        basin_mask, basin_affine_tf, basin_window = rasterio.mask.raster_geometry_mask(
            src, geometries.geoms, all_touched=True, crop=True, pad=True, pad_width=1
        )
        basin_array = src.read(1, window=basin_window)

        hand = calculate_hand(basin_array, basin_affine_tf, src.crs, basin_mask, acc_thresh=acc_thresh)

        write_cog(
            out_raster,
            hand,
            transform=basin_affine_tf.to_gdal(),
            epsg_code=src.crs.to_epsg(),
            nodata_value=np.nan,
        )

make_copernicus_hand(out_raster, vector_file, acc_thresh=100)

Copernicus GLO-30 Height Above Nearest Drainage (HAND)

Make a Height Above Nearest Drainage (HAND) GeoTIFF from the Copernicus GLO-30 DEM covering the watershed boundaries (hydrobasins) defined in a vector file.

For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

Parameters:

Name Type Description Default
out_raster str | Path

HAND GeoTIFF to create

required
vector_file str | Path

Vector file of watershed boundary (hydrobasin) polygons to calculate HAND over

required
acc_thresh int | None

Accumulation threshold for determining the drainage mask. If None, the mean accumulation value is used

100
Source code in asf_tools/hydrosar/hand/calculate.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def make_copernicus_hand(
    out_raster: str | Path,
    vector_file: str | Path,
    acc_thresh: int | None = 100,
):
    """Copernicus GLO-30 Height Above Nearest Drainage (HAND)

    Make a Height Above Nearest Drainage (HAND) GeoTIFF from the Copernicus GLO-30 DEM
    covering the watershed boundaries (hydrobasins) defined in a vector file.

    For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

    Args:
        out_raster: HAND GeoTIFF to create
        vector_file: Vector file of watershed boundary (hydrobasin) polygons to calculate HAND over
        acc_thresh: Accumulation threshold for determining the drainage mask.
            If `None`, the mean accumulation value is used
    """
    with fiona.open(vector_file) as vds:
        geometries = GeometryCollection([shape(feature['geometry']) for feature in vds])

    with NamedTemporaryFile(suffix='.vrt', delete=False) as dem_vrt:
        prepare_dem_vrt(dem_vrt.name, geometries)
        calculate_hand_for_basins(out_raster, geometries, dem_vrt.name, acc_thresh=acc_thresh)

prepare_hand_vrt(vrt, geometry)

Prepare a HAND mosaic VRT covering a given geometry

Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry. The Height Above Nearest Drainage (HAND) mosaic is assembled from the HAND tiles that intersect the geometry, using a HAND derived from the Copernicus GLO-30 DEM.

Note: asf_tools does not currently support geometries that cross the antimeridian.

Parameters:

Name Type Description Default
vrt str | Path

Path for the output VRT file

required
geometry Geometry | BaseGeometry

Geometry in EPSG:4326 (lon/lat) projection for which to prepare a HAND mosaic

required
Source code in asf_tools/hydrosar/hand/prepare.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def prepare_hand_vrt(vrt: str | Path, geometry: ogr.Geometry | BaseGeometry):
    """Prepare a HAND mosaic VRT covering a given geometry

    Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry.
    The Height Above Nearest Drainage (HAND) mosaic is assembled from the HAND tiles that intersect
    the geometry, using a HAND derived from the Copernicus GLO-30 DEM.

    Note: `asf_tools` does not currently support geometries that cross the antimeridian.

    Args:
        vrt: Path for the output VRT file
        geometry: Geometry in EPSG:4326 (lon/lat) projection for which to prepare a HAND mosaic

    """
    with GDALConfigManager(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR'):
        if isinstance(geometry, BaseGeometry):
            geometry = ogr.CreateGeometryFromWkb(geometry.wkb)

        min_lon, max_lon, _, _ = geometry.GetEnvelope()
        if min_lon < -160.0 and max_lon > 160.0:
            raise ValueError(f'asf_tools does not currently support geometries that cross the antimeridian: {geometry}')

        tile_features = vector.get_features(HAND_GEOJSON)
        if not vector.get_property_values_for_intersecting_features(geometry, tile_features):
            raise ValueError(f'Copernicus GLO-30 HAND does not intersect this geometry: {geometry}')

        hand_file_paths = vector.intersecting_feature_properties(geometry, tile_features, 'file_path')

        gdal.BuildVRT(str(vrt), hand_file_paths)

calculate

Calculate Height Above Nearest Drainage (HAND) from the Copernicus GLO-30 DEM

calculate_hand(dem_array, dem_affine, dem_crs, basin_mask, acc_thresh=100)

Calculate the Height Above Nearest Drainage (HAND)

Calculate the Height Above Nearest Drainage (HAND) using pySHEDS library. Because HAND is tied to watershed boundaries (hydrobasins), clipped/cut basins will produce weird edge effects, and incomplete basins should be masked out. For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

This involves: * Filling pits (single-cells lower than their surrounding neighbors) in the Digital Elevation Model (DEM) * Filling depressions (regions of cells lower than their surrounding neighbors) in the Digital Elevation Model (DEM) * Resolving un-drainable flats * Determining the flow direction using the ESRI D8 routing scheme * Determining flow accumulation (number of upstream cells) * Creating a drainage mask using the accumulation threshold acc_thresh * Calculating HAND

In the HAND calculation, NaNs inside the basin filled using fill_hand

Parameters:

Name Type Description Default
dem_array

DEM to calculate HAND for

required
dem_crs CRS

DEM Coordinate Reference System (CRS)

required
dem_affine Affine

DEM Affine geotransform

required
basin_mask

Array of booleans indicating wither an element should be masked out (Ă  la Numpy Masked Arrays: https://numpy.org/doc/stable/reference/maskedarray.generic.html#what-is-a-masked-array)

required
acc_thresh int | None

Accumulation threshold for determining the drainage mask. If None, the mean accumulation value is used

100
Source code in asf_tools/hydrosar/hand/calculate.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def calculate_hand(
    dem_array,
    dem_affine: rasterio.Affine,
    dem_crs: rasterio.crs.CRS,
    basin_mask,
    acc_thresh: int | None = 100,
):
    """Calculate the Height Above Nearest Drainage (HAND)

     Calculate the Height Above Nearest Drainage (HAND) using pySHEDS library. Because HAND
     is tied to watershed boundaries (hydrobasins), clipped/cut basins will produce weird edge
     effects, and incomplete basins should be masked out. For watershed boundaries,
     see: https://www.hydrosheds.org/page/hydrobasins

     This involves:
        * Filling pits (single-cells lower than their surrounding neighbors)
            in the Digital Elevation Model (DEM)
        * Filling depressions (regions of cells lower than their surrounding neighbors)
            in the Digital Elevation Model (DEM)
        * Resolving un-drainable flats
        * Determining the flow direction using the ESRI D8 routing scheme
        * Determining flow accumulation (number of upstream cells)
        * Creating a drainage mask using the accumulation threshold `acc_thresh`
        * Calculating HAND

    In the HAND calculation, NaNs inside the basin filled using `fill_hand`

    Args:
        dem_array: DEM to calculate HAND for
        dem_crs: DEM Coordinate Reference System (CRS)
        dem_affine: DEM Affine geotransform
        basin_mask: Array of booleans indicating wither an element should be masked out (Ă  la Numpy Masked Arrays:
            https://numpy.org/doc/stable/reference/maskedarray.generic.html#what-is-a-masked-array)
        acc_thresh: Accumulation threshold for determining the drainage mask.
            If `None`, the mean accumulation value is used
    """
    nodata_fill_value = np.finfo(float).eps
    with NamedTemporaryFile() as temp_file:
        write_cog(
            temp_file.name,
            dem_array,
            transform=dem_affine.to_gdal(),
            epsg_code=dem_crs.to_epsg(),
            # Prevents PySheds from assuming using zero as the nodata value
            nodata_value=nodata_fill_value,
        )

        # From PySheds; see example usage: http://mattbartos.com/pysheds/
        grid = sGrid.from_raster(str(temp_file.name))
        dem = grid.read_raster(str(temp_file.name))

    log.info('Fill pits in DEM')
    pit_filled_dem = grid.fill_pits(dem)

    log.info('Filling depressions')
    flooded_dem = grid.fill_depressions(pit_filled_dem)
    del pit_filled_dem

    log.info('Resolving flats')
    inflated_dem = grid.resolve_flats(flooded_dem)
    del flooded_dem

    log.info('Obtaining flow direction')
    flow_dir = grid.flowdir(inflated_dem, apply_mask=True)

    log.info('Calculating flow accumulation')
    acc = grid.accumulation(flow_dir)

    if acc_thresh is None:
        acc_thresh = acc.mean()

    log.info(f'Calculating HAND using accumulation threshold of {acc_thresh}')
    hand = grid.compute_hand(flow_dir, inflated_dem, acc > acc_thresh, inplace=False)

    if np.isnan(hand).any():
        log.info('Filling NaNs in the HAND')
        # mask outside of basin with a not-NaN value to prevent NaN-filling outside of basin (optimization)
        hand[basin_mask] = nodata_fill_value
        hand = fill_hand(hand, dem_array)

    # set pixels outside of basin to nodata
    hand[basin_mask] = np.nan

    # TODO: also mask ocean pixels here?

    return hand
calculate_hand_for_basins(out_raster, geometries, dem_file, acc_thresh=100)

Calculate the Height Above Nearest Drainage (HAND) for watershed boundaries (hydrobasins).

For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

Parameters:

Name Type Description Default
out_raster str | Path

HAND GeoTIFF to create

required
geometries GeometryCollection

watershed boundary (hydrobasin) polygons to calculate HAND over

required
dem_file str | Path

DEM raster covering (containing) geometries

required
acc_thresh int | None

Accumulation threshold for determining the drainage mask. If None, the mean accumulation value is used

100
Source code in asf_tools/hydrosar/hand/calculate.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def calculate_hand_for_basins(
    out_raster: str | Path,
    geometries: GeometryCollection,
    dem_file: str | Path,
    acc_thresh: int | None = 100,
):
    """Calculate the Height Above Nearest Drainage (HAND) for watershed boundaries (hydrobasins).

    For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

    Args:
        out_raster: HAND GeoTIFF to create
        geometries: watershed boundary (hydrobasin) polygons to calculate HAND over
        dem_file: DEM raster covering (containing) `geometries`
        acc_thresh: Accumulation threshold for determining the drainage mask.
            If `None`, the mean accumulation value is used
    """
    with rasterio.open(dem_file) as src:
        basin_mask, basin_affine_tf, basin_window = rasterio.mask.raster_geometry_mask(
            src, geometries.geoms, all_touched=True, crop=True, pad=True, pad_width=1
        )
        basin_array = src.read(1, window=basin_window)

        hand = calculate_hand(basin_array, basin_affine_tf, src.crs, basin_mask, acc_thresh=acc_thresh)

        write_cog(
            out_raster,
            hand,
            transform=basin_affine_tf.to_gdal(),
            epsg_code=src.crs.to_epsg(),
            nodata_value=np.nan,
        )
fill_hand(hand, dem)

Replace NaNs in a HAND array with values interpolated from their neighbor's HOND

Replace NaNs in a HAND array with values interpolated from their neighbor's HOND (height of nearest drainage) using a 2D Gaussian kernel. Here, HOND is defined as the DEM value less the HAND value. For the kernel, see: https://docs.astropy.org/en/stable/convolution/#using-astropy-s-convolution-to-replace-bad-data

Source code in asf_tools/hydrosar/hand/calculate.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def fill_hand(hand: np.ndarray, dem: np.ndarray):
    """Replace NaNs in a HAND array with values interpolated from their neighbor's HOND

    Replace NaNs in a HAND array with values interpolated from their neighbor's HOND (height of nearest drainage)
    using a 2D Gaussian kernel. Here, HOND is defined as the DEM value less the HAND value. For the kernel, see:
    https://docs.astropy.org/en/stable/convolution/#using-astropy-s-convolution-to-replace-bad-data
    """
    hond = dem - hand
    hond = fill_nan(hond)

    hand_mask = np.isnan(hand)
    hand[hand_mask] = dem[hand_mask] - hond[hand_mask]
    hand[hand < 0] = 0

    return hand
fill_nan(array)

Replace NaNs with values interpolated from their neighbors

Replace NaNs with values interpolated from their neighbors using a 2D Gaussian kernel, see: https://docs.astropy.org/en/stable/convolution/#using-astropy-s-convolution-to-replace-bad-data

Source code in asf_tools/hydrosar/hand/calculate.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def fill_nan(array: np.ndarray) -> np.ndarray:
    """Replace NaNs with values interpolated from their neighbors

    Replace NaNs with values interpolated from their neighbors using a 2D Gaussian
    kernel, see: https://docs.astropy.org/en/stable/convolution/#using-astropy-s-convolution-to-replace-bad-data
    """
    kernel = astropy.convolution.Gaussian2DKernel(x_stddev=3)  # kernel x_size=8*stddev
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        while np.any(np.isnan(array)):
            array = astropy.convolution.interpolate_replace_nans(array, kernel, convolve=astropy.convolution.convolve)

    return array
make_copernicus_hand(out_raster, vector_file, acc_thresh=100)

Copernicus GLO-30 Height Above Nearest Drainage (HAND)

Make a Height Above Nearest Drainage (HAND) GeoTIFF from the Copernicus GLO-30 DEM covering the watershed boundaries (hydrobasins) defined in a vector file.

For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

Parameters:

Name Type Description Default
out_raster str | Path

HAND GeoTIFF to create

required
vector_file str | Path

Vector file of watershed boundary (hydrobasin) polygons to calculate HAND over

required
acc_thresh int | None

Accumulation threshold for determining the drainage mask. If None, the mean accumulation value is used

100
Source code in asf_tools/hydrosar/hand/calculate.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def make_copernicus_hand(
    out_raster: str | Path,
    vector_file: str | Path,
    acc_thresh: int | None = 100,
):
    """Copernicus GLO-30 Height Above Nearest Drainage (HAND)

    Make a Height Above Nearest Drainage (HAND) GeoTIFF from the Copernicus GLO-30 DEM
    covering the watershed boundaries (hydrobasins) defined in a vector file.

    For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins

    Args:
        out_raster: HAND GeoTIFF to create
        vector_file: Vector file of watershed boundary (hydrobasin) polygons to calculate HAND over
        acc_thresh: Accumulation threshold for determining the drainage mask.
            If `None`, the mean accumulation value is used
    """
    with fiona.open(vector_file) as vds:
        geometries = GeometryCollection([shape(feature['geometry']) for feature in vds])

    with NamedTemporaryFile(suffix='.vrt', delete=False) as dem_vrt:
        prepare_dem_vrt(dem_vrt.name, geometries)
        calculate_hand_for_basins(out_raster, geometries, dem_vrt.name, acc_thresh=acc_thresh)

prepare

Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry

prepare_hand_for_raster(hand_raster, source_raster, resampling_method='lanczos')

Create a HAND raster pixel-aligned to a source raster

Parameters:

Name Type Description Default
hand_raster str | Path

Path for the output HAND raster

required
source_raster str | Path

Path for the source raster

required
resampling_method str

Name of the resampling method to use. For available methods, see: https://gdal.org/programs/gdalwarp.html#cmdoption-gdalwarp-r

'lanczos'
Source code in asf_tools/hydrosar/hand/prepare.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def prepare_hand_for_raster(
    hand_raster: str | Path,
    source_raster: str | Path,
    resampling_method: str = 'lanczos',
):
    """Create a HAND raster pixel-aligned to a source raster

    Args:
        hand_raster: Path for the output HAND raster
        source_raster: Path for the source raster
        resampling_method: Name of the resampling method to use. For available methods, see:
            https://gdal.org/programs/gdalwarp.html#cmdoption-gdalwarp-r
    """
    info = gdal.Info(str(source_raster), format='json')

    hand_geometry = shape(info['wgs84Extent'])
    hand_bounds = [
        info['cornerCoordinates']['upperLeft'][0],
        info['cornerCoordinates']['lowerRight'][1],
        info['cornerCoordinates']['lowerRight'][0],
        info['cornerCoordinates']['upperLeft'][1],
    ]

    with NamedTemporaryFile(suffix='.vrt', delete=False) as hand_vrt:
        prepare_hand_vrt(hand_vrt.name, hand_geometry)
        gdal.Warp(
            str(hand_raster),
            hand_vrt.name,
            dstSRS=f'EPSG:{get_epsg_code(info)}',
            outputBounds=hand_bounds,
            width=info['size'][0],
            height=info['size'][1],
            resampleAlg=Resampling[resampling_method].value,
        )
prepare_hand_vrt(vrt, geometry)

Prepare a HAND mosaic VRT covering a given geometry

Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry. The Height Above Nearest Drainage (HAND) mosaic is assembled from the HAND tiles that intersect the geometry, using a HAND derived from the Copernicus GLO-30 DEM.

Note: asf_tools does not currently support geometries that cross the antimeridian.

Parameters:

Name Type Description Default
vrt str | Path

Path for the output VRT file

required
geometry Geometry | BaseGeometry

Geometry in EPSG:4326 (lon/lat) projection for which to prepare a HAND mosaic

required
Source code in asf_tools/hydrosar/hand/prepare.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def prepare_hand_vrt(vrt: str | Path, geometry: ogr.Geometry | BaseGeometry):
    """Prepare a HAND mosaic VRT covering a given geometry

    Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry.
    The Height Above Nearest Drainage (HAND) mosaic is assembled from the HAND tiles that intersect
    the geometry, using a HAND derived from the Copernicus GLO-30 DEM.

    Note: `asf_tools` does not currently support geometries that cross the antimeridian.

    Args:
        vrt: Path for the output VRT file
        geometry: Geometry in EPSG:4326 (lon/lat) projection for which to prepare a HAND mosaic

    """
    with GDALConfigManager(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR'):
        if isinstance(geometry, BaseGeometry):
            geometry = ogr.CreateGeometryFromWkb(geometry.wkb)

        min_lon, max_lon, _, _ = geometry.GetEnvelope()
        if min_lon < -160.0 and max_lon > 160.0:
            raise ValueError(f'asf_tools does not currently support geometries that cross the antimeridian: {geometry}')

        tile_features = vector.get_features(HAND_GEOJSON)
        if not vector.get_property_values_for_intersecting_features(geometry, tile_features):
            raise ValueError(f'Copernicus GLO-30 HAND does not intersect this geometry: {geometry}')

        hand_file_paths = vector.intersecting_feature_properties(geometry, tile_features, 'file_path')

        gdal.BuildVRT(str(vrt), hand_file_paths)

threshold

expectation_maximization_threshold(tile, number_of_classes=3)

Water threshold Calculation using a multi-mode Expectation Maximization Approach

Thresholding works best when backscatter tiles are provided on a decibel scale to get Gaussian distribution that is scaled to a range of 0-255, and performed on a small tile that is likely to have a transition between water and not water.

Parameters:

Name Type Description Default
tile ndarray

array of backscatter values for a tile from an RTC raster

required
number_of_classes int

classify the tile into this many classes. Typically, three classes capture: (1) urban and bright slopes, (2) average brightness farmland, and (3) water, as is often seen in the US Midwest.

3

Returns:

Name Type Description
threshold float

threshold value that can be used to create a water extent map

Source code in asf_tools/hydrosar/threshold.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def expectation_maximization_threshold(tile: np.ndarray, number_of_classes: int = 3) -> float:
    """Water threshold Calculation using a multi-mode Expectation Maximization Approach

    Thresholding works best when backscatter tiles are provided on a decibel scale
    to get Gaussian distribution that is scaled to a range of 0-255, and performed
    on a small tile that is likely to have a transition between water and not water.


    Args:
        tile: array of backscatter values for a tile from an RTC raster
        number_of_classes: classify the tile into this many classes. Typically, three
            classes capture: (1) urban and bright slopes, (2) average brightness farmland,
            and (3) water, as is often seen in the US Midwest.

    Returns:
        threshold: threshold value that can be used to create a water extent map
    """
    image_copy = tile.copy()
    image_copy2 = np.ma.filled(tile.astype(float), np.nan)  # needed for valid posterior_lookup keys
    image = tile.flatten()
    minimum = np.amin(image)
    image = image - minimum + 1
    maximum = np.amax(image)

    histogram = _make_histogram(image)
    nonzero_indices = np.nonzero(histogram)[0]
    histogram = histogram[nonzero_indices]
    histogram = histogram.flatten()
    class_means = (np.arange(number_of_classes) + 1) * maximum / (number_of_classes + 1)
    class_variances = np.ones(number_of_classes) * maximum
    class_proportions = np.ones(number_of_classes) * 1 / number_of_classes
    sml = np.mean(np.diff(nonzero_indices)) / 1000
    iteration = 0
    while True:
        class_likelihood = _make_distribution(class_means, class_variances, class_proportions, nonzero_indices)
        sum_likelihood = np.sum(class_likelihood, 1) + np.finfo(class_likelihood[0][0]).eps
        log_likelihood = np.sum(histogram * np.log(sum_likelihood))
        for j in range(0, number_of_classes):
            class_posterior_probability = histogram * class_likelihood[:, j] / sum_likelihood
            class_proportions[j] = np.sum(class_posterior_probability)
            class_means[j] = np.sum(nonzero_indices * class_posterior_probability) / class_proportions[j]
            vr = nonzero_indices - class_means[j]
            class_variances[j] = np.sum(vr * vr * class_posterior_probability) / class_proportions[j] + sml
            del class_posterior_probability, vr
        class_proportions += 1e-3
        class_proportions /= np.sum(class_proportions)
        class_likelihood = _make_distribution(class_means, class_variances, class_proportions, nonzero_indices)
        sum_likelihood = np.sum(class_likelihood, 1) + np.finfo(class_likelihood[0, 0]).eps
        del class_likelihood
        new_log_likelihood = np.sum(histogram * np.log(sum_likelihood))
        del sum_likelihood
        if (new_log_likelihood - log_likelihood) < 0.000001:
            break
        iteration += 1
    del log_likelihood, new_log_likelihood
    class_means = class_means + minimum - 1
    s = image_copy.shape
    posterior = np.zeros((s[0], s[1], number_of_classes))
    posterior_lookup: dict = dict()
    for i in range(0, s[0]):
        for j in range(0, s[1]):
            pixel_val = image_copy2[i, j]
            if pixel_val in posterior_lookup:
                for n in range(0, number_of_classes):
                    posterior[i, j, n] = posterior_lookup[pixel_val][n]
            else:
                posterior_lookup.update({pixel_val: [0] * number_of_classes})
                for n in range(0, number_of_classes):
                    x = _make_distribution(
                        class_means[n],
                        class_variances[n],
                        class_proportions[n],
                        image_copy[i, j],
                    )
                    posterior[i, j, n] = x * class_proportions[n]
                    posterior_lookup[pixel_val][n] = posterior[i, j, n]

    sorti = np.argsort(class_means)
    xvec = np.arange(class_means[sorti[0]], class_means[sorti[1]], step=0.05)
    x1 = _make_distribution(
        class_means[sorti[0]],
        class_variances[sorti[0]],
        class_proportions[sorti[0]],
        xvec,
    )
    x2 = _make_distribution(
        class_means[sorti[1]],
        class_variances[sorti[1]],
        class_proportions[sorti[1]],
        xvec,
    )
    dx = np.abs(x1 - x2)

    return xvec[np.argmin(dx)]

water_map

Generate surface water maps from Sentinel-1 RTC products

Create a surface water extent map from a dual-pol Sentinel-1 RTC product and a HAND image. The HAND image must be pixel-aligned (same extent and size) to the RTC images. The water extent maps are created using an adaptive Expectation Maximization thresholding approach and refined using Fuzzy Logic.

format_raster_data(raster, padding_mask=None, nodata=np.iinfo(np.uint8).max)

Ensure raster data is uint8 and set the area outside the valid data to nodata

Source code in asf_tools/hydrosar/water_map.py
149
150
151
152
153
154
155
156
157
def format_raster_data(raster, padding_mask=None, nodata=np.iinfo(np.uint8).max):
    """Ensure raster data is uint8 and set the area outside the valid data to nodata"""
    if padding_mask is None:
        array = read_as_masked_array(raster)
        padding_mask = array.mask
    raster = raster.astype(np.uint8)
    raster[padding_mask] = nodata

    return raster

make_water_map(out_raster, vv_raster, vh_raster, hand_raster=None, tile_shape=(100, 100), max_vv_threshold=-15.5, max_vh_threshold=-23.0, hand_threshold=15.0, hand_fraction=0.8, membership_threshold=0.45)

Creates a surface water extent map from a Sentinel-1 RTC product

Create a surface water extent map from a dual-pol Sentinel-1 RTC product and a HAND image. The HAND image must be pixel-aligned (same extent and size) to the RTC images. The water extent maps are created using an adaptive Expectation Maximization thresholding approach and refined with Fuzzy Logic.

The input images are broken into a set of corresponding tiles with a shape of tile_shape, and a set of tiles are selected from the VH RTC image that contain water boundaries to determine an appropriate water threshold. Candidate tiles must meet these criteria: * hand_fraction of pixels within a tile must have HAND pixel values lower than hand_threshold * The median backscatter value for the tile must be lower than an average tiles' backscatter values * The tile must have a high variance -- high variance is considered initially to be a variance in the 95th percentile of the tile variances, but progressively relaxed to the 5th percentile if there not at least 5 candidate tiles.

The 5 VH tiles with the highest variance are selected for thresholding and a water threshold value is determined using an Expectation Maximization approach. If there were not enough candidate tiles or the threshold is too high, max_vh_threshold and/or max_vv_threshold will be used instead.

From the initial threshold-based water extent maps, Fuzzy Logic is used to remove spurious false detections and improve the water extent map quality. The fuzzy logic uses these indicators for the presence of water: * radar cross section in a pixel relative to the determined detection threshold * the height above nearest drainage (HAND) * the surface slope, which is derived from the HAND data * the size of the detected water feature

For each indicator, a Z-shaped activation function is used to determine pixel membership. The membership maps are combined to form the final water extent map. Pixels classified as water pixels will: * have non-zero membership in all of the indicators, and * have an average membership above the membership_threshold value.

Finally, the VV and VH water masks will be combined to include all water pixels from both masks, and the combined water map will be written to out_raster.

Parameters:

Name Type Description Default
out_raster str | Path

Water map GeoTIFF to create

required
vv_raster str | Path

Sentinel-1 RTC GeoTIFF, in power scale, with VV polarization

required
vh_raster str | Path

Sentinel-1 RTC GeoTIFF, in power scale, with VH polarization

required
hand_raster str | Path | None

Height Above Nearest Drainage (HAND) GeoTIFF aligned to the RTC rasters

None
tile_shape tuple[int, int]

shape (height, width) in pixels to tile the image to

(100, 100)
max_vv_threshold float

Maximum threshold value to use for vv_raster in decibels (db)

-15.5
max_vh_threshold float

Maximum threshold value to use for vh_raster in decibels (db)

-23.0
hand_threshold float

The maximum height above nearest drainage in meters to consider a pixel valid

15.0
hand_fraction float

The minimum fraction of valid HAND pixels required in a tile for thresholding

0.8
membership_threshold float

The average membership to the fuzzy indicators required for a water pixel

0.45
Source code in asf_tools/hydrosar/water_map.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def make_water_map(
    out_raster: str | Path,
    vv_raster: str | Path,
    vh_raster: str | Path,
    hand_raster: str | Path | None = None,
    tile_shape: tuple[int, int] = (100, 100),
    max_vv_threshold: float = -15.5,
    max_vh_threshold: float = -23.0,
    hand_threshold: float = 15.0,
    hand_fraction: float = 0.8,
    membership_threshold: float = 0.45,
):
    """Creates a surface water extent map from a Sentinel-1 RTC product

    Create a surface water extent map from a dual-pol Sentinel-1 RTC product and
    a HAND image. The HAND image must be pixel-aligned (same extent and size) to
    the RTC images. The water extent maps are created using an adaptive Expectation
    Maximization thresholding approach and refined with Fuzzy Logic.

    The input images are broken into a set of corresponding tiles with a shape of
    `tile_shape`, and a set of tiles are selected from the VH RTC
    image that contain water boundaries to determine an appropriate water threshold.
     Candidate tiles must meet these criteria:
    * `hand_fraction` of pixels within a tile must have HAND pixel values lower
      than `hand_threshold`
    * The median backscatter value for the tile must be lower than an average tiles'
      backscatter values
    * The tile must have a high variance -- high variance is considered initially to
      be a variance in the 95th percentile of the tile variances, but progressively
      relaxed to the 5th percentile if there not at least 5 candidate tiles.

    The 5 VH tiles with the highest variance are selected for thresholding and a
    water threshold value is determined using an Expectation Maximization approach.
    If there were not enough candidate tiles or the threshold is too high,
    `max_vh_threshold` and/or `max_vv_threshold` will be used instead.

    From the initial threshold-based water extent maps, Fuzzy Logic is used to remove
    spurious false detections and improve the water extent map quality. The fuzzy logic
    uses these indicators for the presence of water:
    * radar cross section in a pixel relative to the determined detection threshold
    * the height above nearest drainage (HAND)
    * the surface slope, which is derived from the HAND data
    * the size of the detected water feature

    For each indicator, a Z-shaped activation function is used to determine pixel membership.
    The membership maps are combined to form the final water extent map. Pixels classified
    as water pixels will:
    * have non-zero membership in all of the indicators, and
    * have an average membership above the `membership_threshold` value.

    Finally, the VV and VH water masks will be combined to include all water pixels
    from both masks, and the combined water map will be written to `out_raster`.

    Args:
        out_raster: Water map GeoTIFF to create
        vv_raster: Sentinel-1 RTC GeoTIFF, in power scale, with VV polarization
        vh_raster: Sentinel-1 RTC GeoTIFF, in power scale, with VH polarization
        hand_raster: Height Above Nearest Drainage (HAND) GeoTIFF aligned to the RTC rasters
        tile_shape: shape (height, width) in pixels to tile the image to
        max_vv_threshold: Maximum threshold value to use for `vv_raster` in decibels (db)
        max_vh_threshold:  Maximum threshold value to use for `vh_raster` in decibels (db)
        hand_threshold: The maximum height above nearest drainage in meters to consider
            a pixel valid
        hand_fraction: The minimum fraction of valid HAND pixels required in a tile for
            thresholding
        membership_threshold: The average membership to the fuzzy indicators required for a water pixel
    """
    if tile_shape[0] % 2 or tile_shape[1] % 2:
        raise ValueError(f'tile_shape {tile_shape} requires even values.')

    info = gdal.Info(str(vh_raster), format='json')

    out_transform = info['geoTransform']
    out_epsg = get_epsg_code(info)

    if hand_raster is None:
        hand_raster = str(out_raster).replace('.tif', '_HAND.tif')
        log.info(f'Extracting HAND data to: {hand_raster}')
        prepare_hand_for_raster(hand_raster, vh_raster)

    log.info(f'Determining HAND memberships from {hand_raster}')
    hand_array = read_as_masked_array(hand_raster)
    hand_tiles = tile_array(hand_array, tile_shape=tile_shape, pad_value=np.nan)

    hand_candidates = select_hand_tiles(hand_tiles, hand_threshold, hand_fraction)
    log.debug(f'Selected HAND tile candidates {hand_candidates}')

    selected_tiles = None
    nodata = np.iinfo(np.uint8).max
    water_extent_maps = []
    for max_db_threshold, raster, pol in (
        (max_vh_threshold, vh_raster, 'VH'),
        (max_vv_threshold, vv_raster, 'VV'),
    ):
        log.info(f'Creating initial {pol} water extent map from {raster}')
        array = read_as_masked_array(raster)
        padding_mask = array.mask
        tiles = tile_array(array, tile_shape=tile_shape, pad_value=0.0)
        # Masking less than zero only necessary for old HyP3/GAMMA products which sometimes returned negative powers
        tiles = np.ma.masked_less_equal(tiles, 0.0)
        if selected_tiles is None:
            selected_tiles = select_backscatter_tiles(tiles, hand_candidates)
            log.info(f'Selected tiles {selected_tiles} from {raster}')

        with np.testing.suppress_warnings() as sup:
            sup.filter(RuntimeWarning)  # invalid value and divide by zero encountered in log10
            tiles = np.log10(tiles) + 30.0  # linear power scale -> Gaussian scale optimized for thresholding
        max_gaussian_threshold = max_db_threshold / 10.0 + 30.0  # db -> Gaussian scale optimized for thresholding
        if selected_tiles.size:
            scaling = 256 / (np.mean(tiles) + 3 * np.std(tiles))
            gaussian_threshold = determine_em_threshold(tiles[selected_tiles, :, :], scaling)
            threshold_db = 10.0 * (gaussian_threshold - 30.0)
            log.info(f'Threshold determined to be {threshold_db} db')
            if gaussian_threshold > max_gaussian_threshold:
                log.warning(f'Threshold too high! Using maximum threshold {max_db_threshold} db')
                gaussian_threshold = max_gaussian_threshold
        else:
            log.warning(f'Tile selection did not converge! using default threshold {max_db_threshold} db')
            gaussian_threshold = max_gaussian_threshold

        gaussian_array = untile_array(tiles, array.shape)
        water_map = np.ma.masked_less_equal(gaussian_array, gaussian_threshold).mask
        water_map &= ~array.mask

        write_cog(
            str(out_raster).replace('.tif', f'_{pol}_initial.tif'),
            format_raster_data(water_map, padding_mask, nodata),
            transform=out_transform,
            epsg_code=out_epsg,
            dtype=gdal.GDT_Byte,
            nodata_value=nodata,
        )

        log.info(f'Refining initial {pol} water extent map using Fuzzy Logic')
        array = np.ma.masked_where(~water_map, array)
        gaussian_lower_limit = np.log10(np.ma.median(array)) + 30.0

        water_map = fuzzy_refinement(
            water_map,
            gaussian_array,
            hand_array,
            pixel_size=out_transform[1],
            gaussian_thresholds=(gaussian_lower_limit, gaussian_threshold),
            membership_threshold=membership_threshold,
        )
        water_map &= ~array.mask

        write_cog(
            str(out_raster).replace('.tif', f'_{pol}_fuzzy.tif'),
            format_raster_data(water_map, padding_mask, nodata),
            transform=out_transform,
            epsg_code=out_epsg,
            dtype=gdal.GDT_Byte,
            nodata_value=nodata,
        )

        water_extent_maps.append(water_map)

    log.info('Combining Fuzzy VH and VV extent map')
    combined_water_map = np.logical_or(*water_extent_maps)

    combined_segments = measure.label(combined_water_map, connectivity=2)
    combined_water_map = remove_small_segments(combined_segments)

    write_cog(
        out_raster,
        format_raster_data(combined_water_map, padding_mask, nodata),
        transform=out_transform,
        epsg_code=out_epsg,
        dtype=gdal.GDT_Byte,
        nodata_value=nodata,
    )

raster

convert_scale(array, in_scale, out_scale)

Convert calibrated raster scale between db, amplitude and power

Source code in asf_tools/raster.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def convert_scale(
    array: np.ndarray | np.ma.MaskedArray,
    in_scale: Literal['db', 'amplitude', 'power'],
    out_scale: Literal['db', 'amplitude', 'power'],
) -> np.ndarray | np.ma.MaskedArray:
    """Convert calibrated raster scale between db, amplitude and power"""
    if in_scale == out_scale:
        warnings.warn(f'Nothing to do! {in_scale} is same as {out_scale}.')
        return array

    log10 = np.ma.log10 if isinstance(array, np.ma.MaskedArray) else np.log10

    if in_scale == 'db':
        if out_scale == 'power':
            return 10 ** (array / 10)
        if out_scale == 'amplitude':
            return 10 ** (array / 20)

    if in_scale == 'amplitude':
        if out_scale == 'power':
            return array**2
        if out_scale == 'db':
            return 10 * log10(array**2)

    if in_scale == 'power':
        if out_scale == 'amplitude':
            return np.sqrt(array)
        if out_scale == 'db':
            return 10 * log10(array)

    raise ValueError(f'Cannot convert raster of scale {in_scale} to {out_scale}')

read_as_array(raster, band=1)

Reads data from a raster image into memory

Parameters:

Name Type Description Default
raster str

The file path to a raster image

required
band int

The raster band to read

1

Returns:

Name Type Description
data ndarray

The raster pixel data as a numpy array

Source code in asf_tools/raster.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def read_as_array(raster: str, band: int = 1) -> np.ndarray:
    """Reads data from a raster image into memory

    Args:
        raster: The file path to a raster image
        band: The raster band to read

    Returns:
        data: The raster pixel data as a numpy array
    """
    log.debug(f'Reading raster values from {raster}')
    ds = gdal.Open(raster)
    data = ds.GetRasterBand(band).ReadAsArray()
    del ds  # How to close w/ gdal
    return data

read_as_masked_array(raster, band=1)

Reads data from a raster image into memory, masking invalid and NoData values

Parameters:

Name Type Description Default
raster str | Path

The file path to a raster image

required
band int

The raster band to read

1

Returns:

Name Type Description
data MaskedArray

The raster pixel data as a numpy MaskedArray

Source code in asf_tools/raster.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def read_as_masked_array(raster: str | Path, band: int = 1) -> np.ma.MaskedArray:
    """Reads data from a raster image into memory, masking invalid and NoData values

    Args:
        raster: The file path to a raster image
        band: The raster band to read

    Returns:
        data: The raster pixel data as a numpy MaskedArray
    """
    log.debug(f'Reading raster values from {raster}')
    ds = gdal.Open(str(raster))
    raster_band = ds.GetRasterBand(band)
    data = np.ma.masked_invalid(raster_band.ReadAsArray())
    nodata = raster_band.GetNoDataValue()
    if nodata is not None:
        return np.ma.masked_values(data, nodata)
    del ds  # How to close w/ gdal
    return data

write_cog(file_name, data, transform, epsg_code, dtype=gdal.GDT_Float32, nodata_value=None)

Creates a Cloud Optimized GeoTIFF

Parameters:

Name Type Description Default
file_name str | Path

The output file name

required
data ndarray

The raster data

required
transform list[float]

The geotransform for the output GeoTIFF

required
epsg_code int

The integer EPSG code for the output GeoTIFF projection

required
dtype

The pixel data type for the output GeoTIFF

GDT_Float32
nodata_value

The NODATA value for the output Geotiff

None

Returns:

Name Type Description
file_name

The output file name

Source code in asf_tools/raster.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def write_cog(
    file_name: str | Path,
    data: np.ndarray,
    transform: list[float],
    epsg_code: int,
    dtype=gdal.GDT_Float32,
    nodata_value=None,
):
    """Creates a Cloud Optimized GeoTIFF

    Args:
        file_name: The output file name
        data: The raster data
        transform: The geotransform for the output GeoTIFF
        epsg_code: The integer EPSG code for the output GeoTIFF projection
        dtype: The pixel data type for the output GeoTIFF
        nodata_value: The NODATA value for the output Geotiff

    Returns:
        file_name: The output file name
    """
    log.info(f'Creating {file_name}')

    with NamedTemporaryFile() as temp_file:
        driver = gdal.GetDriverByName('GTiff')
        temp_geotiff = driver.Create(temp_file.name, data.shape[1], data.shape[0], 1, dtype)
        temp_geotiff.GetRasterBand(1).WriteArray(data)
        if nodata_value is not None:
            temp_geotiff.GetRasterBand(1).SetNoDataValue(nodata_value)
        temp_geotiff.SetGeoTransform(transform)
        temp_geotiff.SetProjection(epsg_to_wkt(epsg_code))

        driver = gdal.GetDriverByName('COG')
        options = [
            'COMPRESS=LZW',
            'OVERVIEW_RESAMPLING=AVERAGE',
            'NUM_THREADS=ALL_CPUS',
            'BIGTIFF=YES',
        ]
        driver.CreateCopy(str(file_name), temp_geotiff, options=options)

        del temp_geotiff  # How to close w/ gdal
    return file_name

tile

tile_array(array, tile_shape=(200, 200), pad_value=None)

Tile a 2D numpy array

Turn a 2D numpy array like

array = [[0, 0, 1, 1], ... [0, 0, 1, 1], ... [2, 2, 3, 3], ... [2, 2, 3, 3]] array.shape (4, 4)

into a tiled array like

tiles = tiled_array(array, 2, 2) print(tiles) [[[0, 0], [0, 0]], [[1, 1], [1, 1]], [[2, 2], [2, 2]], [[3, 3], [3, 3]]] tiles.shape (4, 2, 2)

Parameters:

Name Type Description Default
array ndarray | MaskedArray

2D array to tile

required
tile_shape tuple[int, int]

the shape of each tile

(200, 200)
pad_value float | None

right-bottom pad a with pad as needed so a is evenly divisible into tiles

None

Returns:

Type Description
ndarray | MaskedArray

the tiled array

Source code in asf_tools/tile.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def tile_array(
    array: np.ndarray | np.ma.MaskedArray,
    tile_shape: tuple[int, int] = (200, 200),
    pad_value: float | None = None,
) -> np.ndarray | np.ma.MaskedArray:
    """Tile a 2D numpy array

    Turn a 2D numpy array like:
        >>> array = [[0, 0, 1, 1],
        ...          [0, 0, 1, 1],
        ...          [2, 2, 3, 3],
        ...          [2, 2, 3, 3]]
        >>> array.shape
        (4, 4)

    into a tiled array like:
        >>> tiles = tiled_array(array, 2, 2)
        >>> print(tiles)
        [[[0, 0],
          [0, 0]],
         [[1, 1],
          [1, 1]],
         [[2, 2],
          [2, 2]],
         [[3, 3],
          [3, 3]]]
        >>> tiles.shape
        (4, 2, 2)

    Args:
        array: 2D array to tile
        tile_shape: the shape of each tile
        pad_value: right-bottom pad `a` with `pad` as needed so `a` is evenly divisible into tiles

    Returns:
        the tiled array
    """
    array_rows, array_columns = array.shape
    tile_rows, tile_columns = tile_shape

    # CREDIT: https://twitter.com/LizzUltee/status/1379508448262512641
    rpad = -array_rows % tile_rows
    cpad = -array_columns % tile_columns

    if (rpad or cpad) and pad_value is None:
        raise ValueError(f'Cannot evenly tile a {array.shape} array into ({tile_rows},{tile_columns}) tiles')

    if rpad or cpad:
        assert pad_value is not None
        padded_array = np.pad(array, ((0, rpad), (0, cpad)), constant_values=pad_value)
        if isinstance(array, np.ma.MaskedArray):
            mask = np.pad(array.mask, ((0, rpad), (0, cpad)), constant_values=True)
            padded_array = np.ma.MaskedArray(padded_array, mask=mask)
    else:
        padded_array = array

    tile_list = []
    for rows in np.vsplit(padded_array, range(tile_rows, array_rows, tile_rows)):
        tile_list.extend(np.hsplit(rows, range(tile_columns, array_columns, tile_columns)))

    dstack = np.ma.dstack if isinstance(array, np.ma.MaskedArray) else np.dstack
    tiled = np.moveaxis(dstack(tile_list), -1, 0)

    return tiled

untile_array(tiled_array, array_shape)

Untile a tiled array into a 2D numpy array

This is the reverse of tile_array and will turn a tiled array like: >>> tiled_array = [[[0,0], ... [0,0]], ... [[1,1], ... [1,1]], ... [[2,2], ... [2,2]], ... [[3,3], ... [3,3]]] >>> tiled_array.shape (4, 2, 2)

into a 2D array like

array = untile_array(tiled_array) print(array) [[0, 0, 1, 1], [0, 0, 1, 1], [2, 2, 3, 3], [2, 2, 3, 3]] array.shape (4, 4)

Parameters:

Name Type Description Default
tiled_array ndarray | MaskedArray

a tiled array

required
array_shape tuple[int, int]

shape to untile the array to. If array_shape's size is smaller than the tiled array, untile_array will subset the tiled array assuming bottom right padding was added when tiling.

required

Returns:

Type Description
ndarray | MaskedArray

the untiled array

Source code in asf_tools/tile.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def untile_array(
    tiled_array: np.ndarray | np.ma.MaskedArray, array_shape: tuple[int, int]
) -> np.ndarray | np.ma.MaskedArray:
    """Untile a tiled array into a 2D numpy array

    This is the reverse of `tile_array` and will turn a tiled array like:
        >>> tiled_array = [[[0,0],
        ...                 [0,0]],
        ...                [[1,1],
        ...                 [1,1]],
        ...                [[2,2],
        ...                 [2,2]],
        ...                [[3,3],
        ...                 [3,3]]]
        >>> tiled_array.shape
        (4, 2, 2)

    into a 2D array like:
        >>> array = untile_array(tiled_array)
        >>> print(array)
        [[0, 0, 1, 1],
         [0, 0, 1, 1],
         [2, 2, 3, 3],
         [2, 2, 3, 3]]
        >>> array.shape
        (4, 4)

    Args:
        tiled_array: a tiled array
        array_shape: shape to untile the array to. If array_shape's size is smaller
            than the tiled array, `untile_array` will subset the tiled array assuming
            bottom right padding was added when tiling.

    Returns:
        the untiled array
    """
    _, tile_rows, tile_columns = tiled_array.shape
    array_rows, array_columns = array_shape

    untiled_rows = int(np.ceil(array_rows / tile_rows))
    untiled_columns = int(np.ceil(array_columns / tile_columns))

    untiled = np.zeros(
        (untiled_rows * tile_rows, untiled_columns * tile_columns),
        dtype=tiled_array.dtype,
    )

    if (array_size := array_rows * array_columns) > tiled_array.size:
        raise ValueError(
            f'array_shape {array_shape} will result in an array bigger than the tiled array:'
            f' {array_size} > {tiled_array.size}'
        )

    for ii in range(untiled_rows):
        for jj in range(untiled_columns):
            untiled[
                ii * tile_rows : (ii + 1) * tile_rows,
                jj * tile_columns : (jj + 1) * tile_columns,
            ] = tiled_array[ii * untiled_columns + jj, :, :]

    if isinstance(tiled_array, np.ma.MaskedArray):
        assert len(untiled.shape) == 2
        untiled_mask = untile_array(tiled_array.mask, untiled.shape)
        untiled = np.ma.MaskedArray(untiled, mask=untiled_mask)

    return untiled[:array_rows, :array_columns]

util

GDALConfigManager

Context manager for setting GDAL config options temporarily

Source code in asf_tools/util.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class GDALConfigManager:
    """Context manager for setting GDAL config options temporarily"""

    def __init__(self, **options):
        """Args:
        **options: GDAL Config `option=value` keyword arguments.
        """
        self.options = options.copy()
        self._previous_options = {}

    def __enter__(self):
        for key in self.options:
            self._previous_options[key] = gdal.GetConfigOption(key)

        for key, value in self.options.items():
            gdal.SetConfigOption(key, value)

    def __exit__(self, exc_type, exc_val, exc_tb):
        for key, value in self._previous_options.items():
            gdal.SetConfigOption(key, value)

__init__(**options)

Args: **options: GDAL Config option=value keyword arguments.

Source code in asf_tools/util.py
10
11
12
13
14
15
def __init__(self, **options):
    """Args:
    **options: GDAL Config `option=value` keyword arguments.
    """
    self.options = options.copy()
    self._previous_options = {}

epsg_to_wkt(epsg_code)

Get the WKT representation of a projection from its EPSG code

Parameters:

Name Type Description Default
epsg_code int

The integer EPSG code

required

Returns:

Name Type Description
wkt str

The WKT representation of the projection

Source code in asf_tools/util.py
57
58
59
60
61
62
63
64
65
66
67
68
def epsg_to_wkt(epsg_code: int) -> str:
    """Get the WKT representation of a projection from its EPSG code

    Args:
        epsg_code: The integer EPSG code

    Returns:
        wkt: The WKT representation of the projection
    """
    srs = osr.SpatialReference()
    srs.ImportFromEPSG(epsg_code)
    return srs.ExportToWkt()

get_coordinates(info)

Get the corner coordinates from a GDAL Info dictionary

Parameters:

Name Type Description Default
info dict

The dictionary returned by a gdal.Info call

required

Returns:

Type Description
(west, south, east, north)

the corner coordinates values

Source code in asf_tools/util.py
43
44
45
46
47
48
49
50
51
52
53
54
def get_coordinates(info: dict) -> tuple[int, int, int, int]:
    """Get the corner coordinates from a GDAL Info dictionary

    Args:
        info: The dictionary returned by a gdal.Info call

    Returns:
        (west, south, east, north): the corner coordinates values
    """
    west, south = info['cornerCoordinates']['lowerLeft']
    east, north = info['cornerCoordinates']['upperRight']
    return west, south, east, north

get_epsg_code(info)

Get the EPSG code from a GDAL Info dictionary

Parameters:

Name Type Description Default
info dict

The dictionary returned by a gdal.Info call

required

Returns:

Name Type Description
epsg_code int

The integer EPSG code

Source code in asf_tools/util.py
29
30
31
32
33
34
35
36
37
38
39
40
def get_epsg_code(info: dict) -> int:
    """Get the EPSG code from a GDAL Info dictionary

    Args:
        info: The dictionary returned by a gdal.Info call

    Returns:
        epsg_code: The integer EPSG code
    """
    proj = osr.SpatialReference(info['coordinateSystem']['wkt'])
    epsg_code = int(proj.GetAttrValue('AUTHORITY', 1))
    return epsg_code

watermasking

generate_osm_tiles

extract_water(water_file, lat, lon, tile_width_deg, tile_height_deg, interior_tile_dir)

Rasterize a water tile from the processed global PBF file.

Parameters:

Name Type Description Default
water_file

The path to the processed global PBF file.

required
lat

The minimum latitude of the tile.

required
lon

The minimum longitude of the tile.

required
tile_width_deg

The desired width of the tile in degrees.

required
tile_height_deg

The desired height of the tile in degrees.

required
Source code in asf_tools/watermasking/generate_osm_tiles.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def extract_water(water_file, lat, lon, tile_width_deg, tile_height_deg, interior_tile_dir):
    """Rasterize a water tile from the processed global PBF file.

    Args:
        water_file: The path to the processed global PBF file.
        lat: The minimum latitude of the tile.
        lon: The minimum longitude of the tile.
        tile_width_deg: The desired width of the tile in degrees.
        tile_height_deg: The desired height of the tile in degrees.
    """
    tile = lat_lon_to_tile_string(lat, lon, is_worldcover=False, postfix='')
    tile_pbf = tile + '.osm.pbf'
    tile_tif = interior_tile_dir + tile + '.tif'
    tile_shp = tile + '.shp'
    tile_geojson = tile + '.geojson'

    # Extract tile from the main pbf, then convert it to a tif.
    bbox = f'--bbox {lon},{lat},{lon + tile_width_deg},{lat + tile_height_deg}'
    extract_command = f'osmium extract -s smart -S tags=natural=water {bbox} {water_file} -o {tile_pbf}'.split(' ')
    export_command = f'osmium export --geometry-types=polygon {tile_pbf} -o {tile_geojson}'.split(' ')
    subprocess.run(extract_command)
    subprocess.run(export_command)

    # Islands and Islets can be members of the water features, so they must be removed.
    water_gdf = gpd.read_file(tile_geojson, engine='pyogrio')
    try:
        water_gdf = water_gdf.drop(water_gdf[water_gdf['place'] == 'island'].index)
        water_gdf = water_gdf.drop(water_gdf[water_gdf['place'] == 'islet'].index)
    except KeyError:
        # When there are no islands to remove, an AttributeError should throw, but we don't care about it.
        pass
    water_gdf.to_file(tile_shp, mode='w', engine='pyogrio')
    water_gdf = None

    xmin, xmax, ymin, ymax = lon, lon + tile_width_deg, lat, lat + tile_height_deg
    pixel_size_x = 0.00009009009
    pixel_size_y = 0.00009009009

    gdal.Rasterize(
        tile_tif,
        tile_shp,
        xRes=pixel_size_x,
        yRes=pixel_size_y,
        burnValues=1,
        outputBounds=[xmin, ymin, xmax, ymax],
        outputType=gdal.GDT_Byte,
        creationOptions=GDAL_OPTIONS,
    )

    temp_files = [
        tile + '.dbf',
        tile + '.cpg',
        tile + '.prj',
        tile + '.shx',
        tile_shp,
        tile_pbf,
        tile_geojson,
    ]
    remove_temp_files(temp_files)

merge_interior_and_ocean(internal_tile_dir, ocean_tile_dir, finished_tile_dir, translate_to_cog=False)

Merge the interior water tiles and ocean water tiles.

Parameters:

Name Type Description Default
interior_tile_dir

The path to the directory containing the interior water tiles.

required
ocean_tile_dir

The path to the directory containing the ocean water tiles.

required
merged_tile_dir

The path to the directory containing the merged water tiles.

required
Source code in asf_tools/watermasking/generate_osm_tiles.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def merge_interior_and_ocean(internal_tile_dir, ocean_tile_dir, finished_tile_dir, translate_to_cog: bool = False):
    """Merge the interior water tiles and ocean water tiles.

    Args:
        interior_tile_dir: The path to the directory containing the interior water tiles.
        ocean_tile_dir: The path to the directory containing the ocean water tiles.
        merged_tile_dir: The path to the directory containing the merged water tiles.
    """
    index = 0
    num_tiles = len([f for f in os.listdir(internal_tile_dir) if f.endswith('tif')]) - 1
    for filename in os.listdir(internal_tile_dir):
        if filename.endswith('.tif'):
            start_time = time.time()

            internal_tile = internal_tile_dir + filename
            external_tile = ocean_tile_dir + filename
            output_tile = finished_tile_dir + filename
            command = [
                'gdal_calc.py',
                '-A',
                internal_tile,
                '-B',
                external_tile,
                '--format',
                'GTiff',
                '--outfile',
                output_tile,
                '--calc',
                '"logical_or(A, B)"',
            ]
            subprocess.run(command)

            if translate_to_cog:
                cogs_dir = finished_tile_dir + 'cogs/'
                try:
                    os.mkdir(cogs_dir)
                except FileExistsError:
                    pass
                out_file = cogs_dir + filename
                translate_string = 'gdal_translate -ot Byte -of COG -co NUM_THREADS=all_cpus'
                command = f'{translate_string} {output_tile} {out_file}'.split(' ')
                subprocess.run(command)
                os.remove(output_tile)

            end_time = time.time()
            total_time = end_time - start_time

            print(f'Elapsed Time: {total_time}(s)')
            print(f'Completed {index} of {num_tiles}')

            index += 1

process_ocean_tiles(ocean_polygons_path, lat, lon, tile_width_deg, tile_height_deg, output_dir)

Process and crop OSM ocean polygons into a tif tile.

Parameters:

Name Type Description Default
ocean_polygons_path

The path to the global ocean polygons file from OSM.

required
lat

The minimum latitude of the tile.

required
lon

The minimum longitude of the tile.

required
tile_width_deg

The width of the tile in degrees.

required
tile_height_deg

The height of the tile in degrees.

required
Source code in asf_tools/watermasking/generate_osm_tiles.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def process_ocean_tiles(ocean_polygons_path, lat, lon, tile_width_deg, tile_height_deg, output_dir):
    """Process and crop OSM ocean polygons into a tif tile.

    Args:
        ocean_polygons_path: The path to the global ocean polygons file from OSM.
        lat: The minimum latitude of the tile.
        lon: The minimum longitude of the tile.
        tile_width_deg: The width of the tile in degrees.
        tile_height_deg: The height of the tile in degrees.
    """
    tile = lat_lon_to_tile_string(lat, lon, is_worldcover=False, postfix='')
    tile_tif = output_dir + tile + '.tif'

    xmin, xmax, ymin, ymax = lon, lon + tile_width_deg, lat, lat + tile_height_deg
    pixel_size_x = 0.00009009009
    pixel_size_y = 0.00009009009

    clipped_polygons_path = tile + '.shp'
    command = f'ogr2ogr -clipsrc {xmin} {ymin} {xmax} {ymax} {clipped_polygons_path} {ocean_polygons_path}'.split(' ')
    subprocess.run(command)

    gdal.Rasterize(
        tile_tif,
        clipped_polygons_path,
        xRes=pixel_size_x,
        yRes=pixel_size_y,
        burnValues=1,
        outputBounds=[xmin, ymin, xmax, ymax],
        outputType=gdal.GDT_Byte,
        creationOptions=GDAL_OPTIONS,
    )

    temp_files = [
        tile + '.dbf',
        tile + '.cpg',
        tile + '.prj',
        tile + '.shx',
        tile + '.shp',
    ]
    remove_temp_files(temp_files)

process_pbf(planet_file, output_file)

Process the global PBF file so that it only contains water features.

Parameters:

Name Type Description Default
planet_file str

The path to the OSM Planet PBF file.

required
output_file str

The desired path of the processed PBF file.

required
Source code in asf_tools/watermasking/generate_osm_tiles.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def process_pbf(planet_file: str, output_file: str):
    """Process the global PBF file so that it only contains water features.

    Args:
        planet_file: The path to the OSM Planet PBF file.
        output_file: The desired path of the processed PBF file.
    """
    natural_file = 'planet_natural.pbf'
    waterways_file = 'planet_waterways.pbf'
    reservoirs_file = 'planet_reservoirs.pbf'

    natural_water_command = f'osmium tags-filter -o {natural_file} {planet_file} wr/natural=water'.split(' ')
    waterways_command = f'osmium tags-filter -o {waterways_file} {planet_file} waterway="*"'.split(' ')
    reservoirs_command = f'osmium tags-filter -o {reservoirs_file} {planet_file} landuse=reservoir'.split(' ')
    merge_command = f'osmium merge {natural_file} {waterways_file} {reservoirs_file} -o {output_file}'.split(' ')

    subprocess.run(natural_water_command)
    subprocess.run(waterways_command)
    subprocess.run(reservoirs_command)
    subprocess.run(merge_command)

generate_worldcover_tiles

build_dataset(worldcover_tile_dir, lat_range, lon_range, tile_width, tile_height)

Main function for generating a dataset with worldcover tiles.

Parameters:

Name Type Description Default
worldcover_tile_dir

The directory containing the unprocessed worldcover tiles.

required
lat_range

The range of latitudes the dataset should cover.

required
lon_range

The range of longitudes the dataset should cover.

required
out_degrees

The width of the outputed dataset tiles in degrees.

required
Source code in asf_tools/watermasking/generate_worldcover_tiles.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def build_dataset(worldcover_tile_dir, lat_range, lon_range, tile_width, tile_height):
    """Main function for generating a dataset with worldcover tiles.

    Args:
        worldcover_tile_dir: The directory containing the unprocessed worldcover tiles.
        lat_range: The range of latitudes the dataset should cover.
        lon_range: The range of longitudes the dataset should cover.
        out_degrees: The width of the outputed dataset tiles in degrees.
    """
    for lat in lat_range:
        for lon in lon_range:
            start_time = time.time()
            tile = lat_lon_to_tile_string(lat, lon, is_worldcover=False)
            tile_filename = UNCROPPED_TILE_DIR + tile
            worldcover_tiles = lat_lon_to_filenames(worldcover_tile_dir, (lat, lon), WORLDCOVER_TILE_SIZE, tile_width)
            print(f'Processing: {tile_filename} {worldcover_tiles}')
            merge_tiles(worldcover_tiles, tile_filename, 'GTiff', compress=True)
            crop_tile(tile, lat, lon, tile_width, tile_height)
            end_time = time.time()
            total_time = end_time - start_time
            print(f'Time Elapsed: {total_time}s')

create_missing_tiles(tile_dir, lat_range, lon_range)

Check for, and build, tiles that may be missing from WorldCover, such as over the ocean.

Parameters:

Name Type Description Default
lat_range

The range of latitudes to check.

required
lon_range

The range of longitudes to check.

required

Returns:

Name Type Description
current_existing_tiles

The list of tiles that exist after the function has completed.

Source code in asf_tools/watermasking/generate_worldcover_tiles.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def create_missing_tiles(tile_dir, lat_range, lon_range):
    """Check for, and build, tiles that may be missing from WorldCover, such as over the ocean.

    Args:
        lat_range: The range of latitudes to check.
        lon_range: The range of longitudes to check.

    Returns:
        current_existing_tiles: The list of tiles that exist after the function has completed.
    """
    current_existing_tiles = [f for f in os.listdir(tile_dir) if f.endswith(FILENAME_POSTFIX)]
    for lon in lon_range:
        for lat in lat_range:
            tile = lat_lon_to_tile_string(lat, lon, is_worldcover=True)
            print(f'Checking {tile}')
            if tile not in current_existing_tiles:
                print(f'Could not find {tile}')

                filename = PREPROCESSED_TILE_DIR + tile
                x_size, y_size = 36000, 36000
                x_res, y_res = 8.333333333333333055e-05, -8.333333333333333055e-05
                ul_lon = lon
                ul_lat = lat + WORLDCOVER_TILE_SIZE
                geotransform = (ul_lon, x_res, 0, ul_lat, 0, y_res)

                driver = gdal.GetDriverByName('GTiff')
                ds = driver.Create(
                    filename,
                    xsize=x_size,
                    ysize=y_size,
                    bands=1,
                    eType=gdal.GDT_Byte,
                    options=GDAL_OPTIONS,
                )
                ds.SetProjection('EPSG:4326')
                ds.SetGeoTransform(geotransform)
                band = ds.GetRasterBand(1)  # Write ones, as tiles should only be missing over water.
                band.WriteArray(np.ones((x_size, y_size)))

                del ds
                del band

                current_existing_tiles.append(tile)
                print(f'Added {tile}')
    return current_existing_tiles

crop_tile(tile, lat, lon, tile_width, tile_height)

Crop the merged tiles

Parameters:

Name Type Description Default
tile

The filename of the desired tile to crop.

required
Source code in asf_tools/watermasking/generate_worldcover_tiles.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def crop_tile(tile, lat, lon, tile_width, tile_height):
    """Crop the merged tiles

    Args:
        tile: The filename of the desired tile to crop.
    """
    in_filename = UNCROPPED_TILE_DIR + tile
    out_filename = CROPPED_TILE_DIR + tile
    pixel_size_x, pixel_size_y = 0.00009009009, -0.00009009009

    src_ds = gdal.Open(in_filename)
    gdal.Translate(
        out_filename,
        src_ds,
        projWin=[lon, lat + tile_height, lon + tile_width, lat],
        xRes=pixel_size_x,
        yRes=pixel_size_y,
        outputSRS='EPSG:4326',
        format='COG',
        creationOptions=['NUM_THREADS=all_cpus'],
    )
    remove_temp_files(['tmp_px_size.tif', 'tmp.shp'])

get_tiles(osm_tile_coord, wc_tile_width, tile_width)

Get a list of the worldcover tile locations necessary to fully cover an OSM tile.

Parameters:

Name Type Description Default
osm_tile_coord tuple

The lower left corner coordinate (lat, lon) of the desired OSM tile.

required
wc_tile_width int

The width/height of the Worldcover tiles in degrees.

required
tile_width int

The width/height of the OSM tiles in degrees.

required

Returns:

Name Type Description
tiles

A list of the lower left corner coordinates of the Worldcover tiles that overlap the OSM tile.

Source code in asf_tools/watermasking/generate_worldcover_tiles.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get_tiles(osm_tile_coord: tuple, wc_tile_width: int, tile_width: int):
    """Get a list of the worldcover tile locations necessary to fully cover an OSM tile.

    Args:
        osm_tile_coord: The lower left corner coordinate (lat, lon) of the desired OSM tile.
        wc_tile_width: The width/height of the Worldcover tiles in degrees.
        tile_width: The width/height of the OSM tiles in degrees.

    Returns:
        tiles: A list of the lower left corner coordinates of the Worldcover tiles that overlap the OSM tile.
    """
    osm_lat = osm_tile_coord[0]
    osm_lon = osm_tile_coord[1]

    min_lat = osm_lat - (osm_lat % wc_tile_width)
    max_lat = osm_lat + tile_width
    min_lon = osm_lon - (osm_lon % wc_tile_width)
    max_lon = osm_lon + tile_width

    lats = range(min_lat, max_lat, wc_tile_width)
    lons = range(min_lon, max_lon, wc_tile_width)

    tiles = []
    for lat in lats:
        for lon in lons:
            tiles.append((lat, lon))

    return tiles

lat_lon_to_filenames(worldcover_tile_dir, osm_tile_coord, wc_tile_width, tile_width)

Get a list of the Worldcover tile filenames that are necessary to overlap an OSM tile.

Parameters:

Name Type Description Default
osm_tile

The lower left corner (lat, lon) of the desired OSM tile.

required
wc_tile_width int

The width of the Worldcover tiles in degrees.

required
tile_width int

The width of the OSM tiles in degrees.

required

Returns:

Name Type Description
filenames

The list of Worldcover filenames.

Source code in asf_tools/watermasking/generate_worldcover_tiles.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def lat_lon_to_filenames(worldcover_tile_dir, osm_tile_coord: tuple, wc_tile_width: int, tile_width: int):
    """Get a list of the Worldcover tile filenames that are necessary to overlap an OSM tile.

    Args:
        osm_tile: The lower left corner (lat, lon) of the desired OSM tile.
        wc_tile_width: The width of the Worldcover tiles in degrees.
        tile_width: The width of the OSM tiles in degrees.

    Returns:
        filenames: The list of Worldcover filenames.
    """
    filenames = []
    tiles = get_tiles(osm_tile_coord, wc_tile_width, tile_width)
    for tile in tiles:
        filenames.append(worldcover_tile_dir + lat_lon_to_tile_string(tile[0], tile[1], is_worldcover=True))
    return filenames

tile_preprocessing(tile_dir, min_lat, max_lat, min_lon, max_lon)

The worldcover tiles have lots of unnecessary classes, these need to be removed first. Note: make a back-up copy of this directory.

Parameters:

Name Type Description Default
tile_dir

The directory containing all of the worldcover tiles.

required
Source code in asf_tools/watermasking/generate_worldcover_tiles.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def tile_preprocessing(tile_dir, min_lat, max_lat, min_lon, max_lon):
    """The worldcover tiles have lots of unnecessary classes, these need to be removed first.
       Note: make a back-up copy of this directory.

    Args:
        tile_dir: The directory containing all of the worldcover tiles.
    """
    filenames = [f for f in os.listdir(tile_dir) if f.endswith('.tif')]

    def filename_filter(filename):
        latitude = int(filename.split('_')[5][1:3])
        longitude = int(filename.split('_')[5][4:7])
        if filename.split('_')[5][3] == 'W':
            longitude = -longitude
        mnlat = min_lat - (min_lat % WORLDCOVER_TILE_SIZE)
        mnlon = min_lon - (min_lon % WORLDCOVER_TILE_SIZE)
        mxlat = max_lat + (max_lat % WORLDCOVER_TILE_SIZE)
        mxlon = max_lon + (max_lon % WORLDCOVER_TILE_SIZE)
        in_lat_range = (latitude >= mnlat) and (latitude <= mxlat)
        in_lon_range = (longitude >= mnlon) and (longitude <= mxlon)
        return in_lat_range and in_lon_range

    filenames_filtered = [f for f in filenames if filename_filter(f)]

    index = 0
    num_tiles = len(filenames_filtered)
    for filename in filenames_filtered:
        start_time = time.time()

        tile_name = filename.split('_')[5]
        filename = str(Path(tile_dir) / filename)
        dst_filename = PREPROCESSED_TILE_DIR + tile_name + '.tif'

        print(f'Processing: {filename}  ---  {dst_filename}  -- {index} of {num_tiles}')

        src_ds = gdal.Open(filename)
        src_band = src_ds.GetRasterBand(1)
        src_arr = src_band.ReadAsArray()

        not_water = np.logical_and(src_arr != 80, src_arr != 0)
        water_arr = np.ones(src_arr.shape)
        water_arr[not_water] = 0

        driver = gdal.GetDriverByName('GTiff')
        dst_ds = driver.Create(
            dst_filename,
            water_arr.shape[0],
            water_arr.shape[1],
            1,
            gdal.GDT_Byte,
            options=GDAL_OPTIONS,
        )
        dst_ds.SetGeoTransform(src_ds.GetGeoTransform())
        dst_ds.SetProjection(src_ds.GetProjection())
        dst_band = dst_ds.GetRasterBand(1)
        dst_band.WriteArray(water_arr)
        dst_band.FlushCache()

        del dst_ds
        del src_ds

        end_time = time.time()
        total_time = end_time - start_time

        print(f'Processing {dst_filename} took {total_time} seconds.')

        index += 1

utils

lat_lon_to_tile_string(lat, lon, is_worldcover=False, postfix='.tif')

Get the name of the tile with lower left corner (lat, lon).

Parameters:

Name Type Description Default
lat

The minimum latitude of the tile.

required
lon

The minimum longitude of the tile.

required
is_worldcover bool

Wheter the tile is Worldcover or OSM.

False
postfix str

A postfix to append to the tile name to make it a filename.

'.tif'

Returns:

Type Description

The name of the tile.

Source code in asf_tools/watermasking/utils.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def lat_lon_to_tile_string(lat, lon, is_worldcover: bool = False, postfix: str = '.tif'):
    """Get the name of the tile with lower left corner (lat, lon).

    Args:
        lat: The minimum latitude of the tile.
        lon: The minimum longitude of the tile.
        is_worldcover: Wheter the tile is Worldcover or OSM.
        postfix: A postfix to append to the tile name to make it a filename.

    Returns:
        The name of the tile.
    """
    prefixes = ['N', 'S', 'E', 'W'] if is_worldcover else ['n', 's', 'e', 'w']
    if lat >= 0:
        lat_part = prefixes[0] + str(int(lat)).zfill(2)
    else:
        lat_part = prefixes[1] + str(int(np.abs(lat))).zfill(2)
    if lon >= 0:
        lon_part = prefixes[2] + str(int(lon)).zfill(3)
    else:
        lon_part = prefixes[3] + str(int(np.abs(lon))).zfill(3)
    return lat_part + lon_part + postfix

merge_tiles(tiles, out_filename, out_format, compress=False)

Merge tiles via buildvrt and translate.

Parameters:

Name Type Description Default
tiles

The list of tiles to be merged.

required
out_format

The format of the output image.

required
out_filename

The name of the output COG.

required
Source code in asf_tools/watermasking/utils.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def merge_tiles(tiles, out_filename, out_format, compress=False):
    """Merge tiles via buildvrt and translate.

    Args:
        tiles: The list of tiles to be merged.
        out_format: The format of the output image.
        out_filename: The name of the output COG.
    """
    vrt = 'merged.vrt'
    build_vrt_command = ['gdalbuildvrt', vrt] + tiles
    if not compress:
        translate_command = ['gdal_translate', '-of', out_format, vrt, out_filename]
    else:
        translate_command = [
            'gdal_translate',
            '-of',
            out_format,
            '-co',
            'COMPRESS=LZW',
            '-co',
            'NUM_THREADS=all_cpus',
            vrt,
            out_filename,
        ]
    subprocess.run(build_vrt_command)
    subprocess.run(translate_command)
    remove_temp_files([vrt])

remove_temp_files(temp_files)

Remove each file in a list of files.

Parameters:

Name Type Description Default
temp_files list

The list of temporary files to remove.

required
Source code in asf_tools/watermasking/utils.py
60
61
62
63
64
65
66
67
68
69
70
def remove_temp_files(temp_files: list):
    """Remove each file in a list of files.

    Args:
        temp_files: The list of temporary files to remove.
    """
    for file in temp_files:
        try:
            os.remove(file)
        except FileNotFoundError:
            print(f'Temp file {file} was not found, skipping removal...')

setup_directories(dirs)

Setup the directories necessary for running the script.

Source code in asf_tools/watermasking/utils.py
73
74
75
76
77
78
79
80
def setup_directories(dirs: list[str]):
    """Setup the directories necessary for running the script."""
    for directory in dirs:
        try:
            os.mkdir(directory)
        except FileExistsError:
            # Directories already exists.
            pass