asf_tools
v0.4.0 API Reference¶
Tools developed by ASF for working with SAR data
composite
¶
Create a local-resolution-weighted composite from Sentinel-1 RTC products.
Create a local-resolution-weighted composite from a set of Sentinel-1 RTC products (D. Small, 2012). The local resolution, defined as the inverse of the local contributing (scattering) area, is used to weight each RTC products' contributions to the composite image on a pixel-by-pixel basis. The composite image is created as a Cloud Optimized GeoTIFF (COG). Additionally, a COG specifying the number of rasters contributing to each composite pixel is created.
References
David Small, 2012: https://doi.org/10.1109/IGARSS.2012.6350465
epsg_to_wkt(epsg_code)
¶
Get the WKT representation of a projection from its EPSG code
Parameters:
Name | Type | Description | Default |
---|---|---|---|
epsg_code |
int |
The integer EPSG code |
required |
Returns:
Type | Description |
---|---|
wkt |
The WKT representation of the projection |
Source code in asf_tools/composite.py
def epsg_to_wkt(epsg_code: int) -> str:
"""Get the WKT representation of a projection from its EPSG code
Args:
epsg_code: The integer EPSG code
Returns:
wkt: The WKT representation of the projection
"""
srs = osr.SpatialReference()
srs.ImportFromEPSG(epsg_code)
return srs.ExportToWkt()
get_area_raster(raster)
¶
Determine the path of the area raster for a given backscatter raster based on naming conventions for HyP3 RTC products
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raster |
str |
path of the backscatter raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_VV.tif |
required |
Returns:
Type | Description |
---|---|
area_raster |
path of the area raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_area.tif |
Source code in asf_tools/composite.py
def get_area_raster(raster: str) -> str:
"""Determine the path of the area raster for a given backscatter raster based on naming conventions for HyP3 RTC
products
Args:
raster: path of the backscatter raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_VV.tif
Returns:
area_raster: path of the area raster, e.g. S1A_IW_20181102T155531_DVP_RTC30_G_gpuned_5685_area.tif
"""
return '_'.join(raster.split('_')[:-1] + ['area.tif'])
get_epsg_code(info)
¶
Get the EPSG code from a GDAL Info dictionary
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
dict |
The dictionary returned by a gdal.Info call |
required |
Returns:
Type | Description |
---|---|
epsg_code |
The integer EPSG code |
Source code in asf_tools/composite.py
def get_epsg_code(info: dict) -> int:
"""Get the EPSG code from a GDAL Info dictionary
Args:
info: The dictionary returned by a gdal.Info call
Returns:
epsg_code: The integer EPSG code
"""
proj = osr.SpatialReference(info['coordinateSystem']['wkt'])
epsg_code = int(proj.GetAttrValue('AUTHORITY', 1))
return epsg_code
get_full_extent(raster_info)
¶
Determine the corner coordinates and geotransform for the full extent of a set of rasters
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raster_info |
dict |
A dictionary of gdal.Info results for the set of rasters |
required |
Returns:
Type | Description |
---|---|
upper_left |
The upper left corner of the extent as a tuple upper_right: The lower right corner of the extent as a tuple geotransform: The geotransform of the extent as a list |
Source code in asf_tools/composite.py
def get_full_extent(raster_info: dict):
"""Determine the corner coordinates and geotransform for the full extent of a set of rasters
Args:
raster_info: A dictionary of gdal.Info results for the set of rasters
Returns:
upper_left: The upper left corner of the extent as a tuple
upper_right: The lower right corner of the extent as a tuple
geotransform: The geotransform of the extent as a list
"""
upper_left_corners = [info['cornerCoordinates']['upperLeft'] for info in raster_info.values()]
lower_right_corners = [info['cornerCoordinates']['lowerRight'] for info in raster_info.values()]
ulx = min([ul[0] for ul in upper_left_corners])
uly = max([ul[1] for ul in upper_left_corners])
lrx = max([lr[0] for lr in lower_right_corners])
lry = min([lr[1] for lr in lower_right_corners])
log.debug(f'Full extent raster upper left: ({ulx, uly}); lower right: ({lrx, lry})')
trans = []
for info in raster_info.values():
# Only need info from any one raster
trans = info['geoTransform']
break
trans[0] = ulx
trans[3] = uly
return (ulx, uly), (lrx, lry), trans
get_target_epsg_code(codes)
¶
Determine the target UTM EPSG projection for the output composite
Parameters:
Name | Type | Description | Default |
---|---|---|---|
codes |
List[int] |
List of UTM EPSG codes |
required |
Returns:
Type | Description |
---|---|
target |
UTM EPSG code |
Source code in asf_tools/composite.py
def get_target_epsg_code(codes: List[int]) -> int:
"""Determine the target UTM EPSG projection for the output composite
Args:
codes: List of UTM EPSG codes
Returns:
target: UTM EPSG code
"""
# use median east/west UTM zone of all files, regardless of hemisphere
# UTM EPSG codes for each hemisphere will look like:
# North: 326XX
# South: 327XX
valid_codes = list(range(32601, 32661)) + list(range(32701, 32761))
if bad_codes := set(codes) - set(valid_codes):
raise ValueError(f'Non UTM EPSG code encountered: {bad_codes}')
hemispheres = [c // 100 * 100 for c in codes]
# if even modes, choose lowest (North)
target_hemisphere = min(multimode(hemispheres))
zones = sorted([c % 100 for c in codes])
# if even length, choose fist of median two
target_zone = zones[(len(zones) - 1) // 2]
return target_hemisphere + target_zone
make_composite(out_name, rasters, resolution=None)
¶
Creates a local-resolution-weighted composite from Sentinel-1 RTC products
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_name |
str |
The base name of the output GeoTIFFs |
required |
rasters |
List[str] |
A list of file paths of the images to composite |
required |
resolution |
float |
The pixel size for the output GeoTIFFs |
None |
Returns:
Type | Description |
---|---|
out_raster |
Path to the created composite backscatter GeoTIFF out_counts_raster: Path to the created GeoTIFF with counts of scenes contributing to each pixel |
Source code in asf_tools/composite.py
def make_composite(out_name: str, rasters: List[str], resolution: float = None):
"""Creates a local-resolution-weighted composite from Sentinel-1 RTC products
Args:
out_name: The base name of the output GeoTIFFs
rasters: A list of file paths of the images to composite
resolution: The pixel size for the output GeoTIFFs
Returns:
out_raster: Path to the created composite backscatter GeoTIFF
out_counts_raster: Path to the created GeoTIFF with counts of scenes contributing to each pixel
"""
if not rasters:
raise ValueError('Must specify at least one raster to composite')
raster_info = {}
for raster in rasters:
raster_info[raster] = gdal.Info(raster, format='json')
# make sure gdal can read the area raster
gdal.Info(get_area_raster(raster))
target_epsg_code = get_target_epsg_code([get_epsg_code(info) for info in raster_info.values()])
log.debug(f'Composite projection is EPSG:{target_epsg_code}')
if resolution is None:
resolution = max([info['geoTransform'][1] for info in raster_info.values()])
log.debug(f'Composite resolution is {resolution} meters')
# resample rasters to maximum resolution & common UTM zone
with TemporaryDirectory(prefix='reprojected_') as temp_dir:
raster_info = reproject_to_target(raster_info, target_epsg_code=target_epsg_code, target_resolution=resolution,
directory=temp_dir)
# Get extent of union of all images
full_ul, full_lr, full_trans = get_full_extent(raster_info)
nx = int(abs(full_ul[0] - full_lr[0]) // resolution)
ny = int(abs(full_ul[1] - full_lr[1]) // resolution)
outputs = np.zeros((ny, nx))
weights = np.zeros(outputs.shape)
counts = np.zeros(outputs.shape, dtype=np.int8)
for raster, info in raster_info.items():
log.info(f'Processing raster {raster}')
log.debug(f"Raster upper left: {info['cornerCoordinates']['upperLeft']}; "
f"lower right: {info['cornerCoordinates']['lowerRight']}")
values = read_as_array(raster)
area_raster = get_area_raster(raster)
areas = read_as_array(area_raster)
ulx, uly = info['cornerCoordinates']['upperLeft']
y_index_start = int((full_ul[1] - uly) // resolution)
y_index_end = y_index_start + values.shape[0]
x_index_start = int((ulx - full_ul[0]) // resolution)
x_index_end = x_index_start + values.shape[1]
log.debug(
f'Placing values in output grid at {y_index_start}:{y_index_end} and {x_index_start}:{x_index_end}'
)
mask = values == 0
raster_weights = 1.0 / areas
raster_weights[mask] = 0
outputs[y_index_start:y_index_end, x_index_start:x_index_end] += values * raster_weights
weights[y_index_start:y_index_end, x_index_start:x_index_end] += raster_weights
counts[y_index_start:y_index_end, x_index_start:x_index_end] += ~mask
del values, areas, mask, raster_weights
# Divide by the total weight applied
outputs /= weights
del weights
out_raster = write_cog(f'{out_name}.tif', outputs, full_trans, target_epsg_code, nodata_value=0)
del outputs
out_counts_raster = write_cog(f'{out_name}_counts.tif', counts, full_trans, target_epsg_code, dtype=gdal.GDT_Int16)
del counts
return out_raster, out_counts_raster
read_as_array(raster, band=1)
¶
Reads data from a raster image into memory
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raster |
str |
The file path to a raster image |
required |
band |
int |
The raster band to read |
1 |
Returns:
Type | Description |
---|---|
data |
The raster pixel data as a numpy array |
Source code in asf_tools/composite.py
def read_as_array(raster: str, band: int = 1) -> np.array:
"""Reads data from a raster image into memory
Args:
raster: The file path to a raster image
band: The raster band to read
Returns:
data: The raster pixel data as a numpy array
"""
log.debug(f'Reading raster values from {raster}')
ds = gdal.Open(raster)
data = ds.GetRasterBand(band).ReadAsArray()
del ds # How to close w/ gdal
return data
reproject_to_target(raster_info, target_epsg_code, target_resolution, directory)
¶
Reprojects a set of raster images to a common projection and resolution
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raster_info |
dict |
A dictionary of gdal.Info results for the set of rasters |
required |
target_epsg_code |
int |
The integer EPSG code for the target projection |
required |
target_resolution |
float |
The target resolution |
required |
directory |
str |
The directory in which to create the reprojected files |
required |
Returns:
Type | Description |
---|---|
target_raster_info |
An updated dictionary of gdal.Info results for the reprojected files |
Source code in asf_tools/composite.py
def reproject_to_target(raster_info: dict, target_epsg_code: int, target_resolution: float, directory: str) -> dict:
"""Reprojects a set of raster images to a common projection and resolution
Args:
raster_info: A dictionary of gdal.Info results for the set of rasters
target_epsg_code: The integer EPSG code for the target projection
target_resolution: The target resolution
directory: The directory in which to create the reprojected files
Returns:
target_raster_info: An updated dictionary of gdal.Info results for the reprojected files
"""
target_raster_info = {}
for raster, info in raster_info.items():
epsg_code = get_epsg_code(info)
resolution = info['geoTransform'][1]
if epsg_code != target_epsg_code or resolution != target_resolution:
log.info(f'Reprojecting {raster}')
reprojected_raster = os.path.join(directory, os.path.basename(raster))
gdal.Warp(
reprojected_raster, raster, dstSRS=f'EPSG:{target_epsg_code}',
xRes=target_resolution, yRes=target_resolution, targetAlignedPixels=True
)
area_raster = get_area_raster(raster)
log.info(f'Reprojecting {area_raster}')
reprojected_area_raster = os.path.join(directory, os.path.basename(area_raster))
gdal.Warp(
reprojected_area_raster, area_raster, dstSRS=f'EPSG:{target_epsg_code}',
xRes=target_resolution, yRes=target_resolution, targetAlignedPixels=True
)
target_raster_info[reprojected_raster] = gdal.Info(reprojected_raster, format='json')
else:
log.info(f'No need to reproject {raster}')
target_raster_info[raster] = info
return target_raster_info
write_cog(file_name, data, transform, epsg_code, dtype=6, nodata_value=None)
¶
Creates a Cloud Optimized GeoTIFF
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_name |
Union[str, pathlib.Path] |
The output file name |
required |
data |
ndarray |
The raster data |
required |
transform |
List[float] |
The geotransform for the output GeoTIFF |
required |
epsg_code |
int |
The integer EPSG code for the output GeoTIFF projection |
required |
dtype |
The pixel data type for the output GeoTIFF |
6 |
|
nodata_value |
The NODATA value for the output Geotiff |
None |
Returns:
Type | Description |
---|---|
file_name |
The output file name |
Source code in asf_tools/composite.py
def write_cog(file_name: Union[str, Path], data: np.ndarray, transform: List[float], epsg_code: int,
dtype=gdal.GDT_Float32, nodata_value=None):
"""Creates a Cloud Optimized GeoTIFF
Args:
file_name: The output file name
data: The raster data
transform: The geotransform for the output GeoTIFF
epsg_code: The integer EPSG code for the output GeoTIFF projection
dtype: The pixel data type for the output GeoTIFF
nodata_value: The NODATA value for the output Geotiff
Returns:
file_name: The output file name
"""
log.info(f'Creating {file_name}')
with NamedTemporaryFile() as temp_file:
driver = gdal.GetDriverByName('GTiff')
temp_geotiff = driver.Create(temp_file.name, data.shape[1], data.shape[0], 1, dtype)
temp_geotiff.GetRasterBand(1).WriteArray(data)
if nodata_value is not None:
temp_geotiff.GetRasterBand(1).SetNoDataValue(nodata_value)
temp_geotiff.SetGeoTransform(transform)
temp_geotiff.SetProjection(epsg_to_wkt(epsg_code))
driver = gdal.GetDriverByName('COG')
options = ['COMPRESS=LZW', 'OVERVIEW_RESAMPLING=AVERAGE', 'NUM_THREADS=ALL_CPUS', 'BIGTIFF=YES']
driver.CreateCopy(str(file_name), temp_geotiff, options=options)
del temp_geotiff # How to close w/ gdal
return file_name
dem
¶
Prepare a Copernicus GLO-30 DEM virtual raster (VRT) covering a given geometry
prepare_dem_vrt(vrt, geometry)
¶
Create a DEM mosaic VRT covering a given geometry
The DEM mosaic is assembled from the Copernicus GLO-30 DEM tiles that intersect the geometry.
Note: asf_tools
does not currently support geometries that cross the antimeridian.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
vrt |
Union[str, pathlib.Path] |
Path for the output VRT file |
required |
geometry |
Union[osgeo.ogr.Geometry, shapely.geometry.base.BaseGeometry] |
Geometry in EPSG:4326 (lon/lat) projection for which to prepare a DEM mosaic |
required |
Source code in asf_tools/dem.py
def prepare_dem_vrt(vrt: Union[str, Path], geometry: Union[ogr.Geometry, BaseGeometry]):
"""Create a DEM mosaic VRT covering a given geometry
The DEM mosaic is assembled from the Copernicus GLO-30 DEM tiles that intersect the geometry.
Note: `asf_tools` does not currently support geometries that cross the antimeridian.
Args:
vrt: Path for the output VRT file
geometry: Geometry in EPSG:4326 (lon/lat) projection for which to prepare a DEM mosaic
"""
with GDALConfigManager(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR'):
if isinstance(geometry, BaseGeometry):
geometry = ogr.CreateGeometryFromWkb(geometry.wkb)
min_lon, max_lon, _, _ = geometry.GetEnvelope()
if min_lon < -160. and max_lon > 160.:
raise ValueError(f'asf_tools does not currently support geometries that cross the antimeridian: {geometry}')
tile_features = vector.get_features(DEM_GEOJSON)
if not vector.get_property_values_for_intersecting_features(geometry, tile_features):
raise ValueError(f'Copernicus GLO-30 DEM does not intersect this geometry: {geometry}')
dem_file_paths = vector.intersecting_feature_properties(geometry, tile_features, 'file_path')
gdal.BuildVRT(str(vrt), dem_file_paths)
flood_map
¶
Generate flood depth map from surface water extent map.
Create a flood depth map from a surface water extent map and a HAND image. The HAND image must be pixel-aligned (same extent and size) to the water extent map, and the surface water extent map should be a byte GeoTIFF indicating water (true), not water (false). Flood depth maps are estimated using either a numerical, normalized median absolute deviation, logarithmic or iterative approach.
logstat(data, func=<function nanstd at 0x7f6cdd74a5e0>)
¶
Calculate a function in logarithmic scale and return in linear scale. INF values inside the data array are set to nan.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
ndarray |
array of data |
required |
func |
Callable |
statistical function to calculate in logarithmic scale |
<function nanstd at 0x7f6cdd74a5e0> |
Returns:
Type | Description |
---|---|
statistic |
statistic of data in linear scale |
Source code in asf_tools/flood_map.py
def logstat(data: np.ndarray, func: Callable = np.nanstd) -> Union[np.ndarray, float]:
""" Calculate a function in logarithmic scale and return in linear scale.
INF values inside the data array are set to nan.
Args:
data: array of data
func: statistical function to calculate in logarithmic scale
Returns:
statistic: statistic of data in linear scale
"""
ld = np.log(data)
ld[np.isinf(ld)] = np.nan
st = func(ld)
return np.exp(st)
make_flood_map(out_raster, water_raster, hand_raster, estimator='iterative', water_level_sigma=3.0, known_water_threshold=30.0, iterative_bounds=(0, 15))
¶
Create a flood depth map from a surface water extent map.
WARNING: This functionality is still under active development and the products created using this function are likely to change in the future.
Create a flood depth map from a single surface water extent map and a HAND image. The HAND image must be pixel-aligned to the surface water extent map. The the surface water extent map should be a byte GeoTIFF indicating water (true) and not water (false)
Known perennial Global Surface-water data are produced under the Copernicus Programme (Pekel et al., 2016), and are included with surface-water detection maps when generating the flood depth product.
Flood depth maps are estimated using one of the approaches: Iterative: (Default) Basin hopping optimization method matches flooded areas to flood depth estimates given by the HAND layer. This is the most accurate method but also the most time-intensive. Normalized Median Absolute Deviation (nmad): Uses a median operator to estimate the variation to increase robustness in the presence of outliers. Logstat: Calculates the mean and standard deviation of HAND heights in the logarithmic domain to improve robustness for very non-Gaussian data distributions. Numpy: Calculates statistics on a linear scale. Least robust to outliers and non-Gaussian distributions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_raster |
Union[str, pathlib.Path] |
Flood depth GeoTIFF to create |
required |
water_raster |
Union[str, pathlib.Path] |
Surface water extent GeoTIFF |
required |
hand_raster |
Union[str, pathlib.Path] |
Height Above Nearest Drainage (HAND) GeoTIFF aligned to the surface water extent raster |
required |
estimator |
str |
Estimation approach for determining flood depth |
'iterative' |
water_level_sigma |
float |
Max water height used in logstat, nmad, and numpy estimations |
3.0 |
known_water_threshold |
float |
Threshold for extracting the known water area in percent |
30.0 |
iterative_bounds |
Tuple[int, int] |
Bounds on basin-hopping algorithm used in iterative estimation |
(0, 15) |
References
Jean-Francios Pekel, Andrew Cottam, Noel Gorelik, Alan S. Belward. 2016. https://doi:10.1038/nature20584
Source code in asf_tools/flood_map.py
def make_flood_map(out_raster: Union[str, Path], water_raster: Union[str, Path],
hand_raster: Union[str, Path], estimator: str = 'iterative',
water_level_sigma: float = 3.,
known_water_threshold: float = 30.,
iterative_bounds: Tuple[int, int] = (0, 15)):
"""Create a flood depth map from a surface water extent map.
WARNING: This functionality is still under active development and the products
created using this function are likely to change in the future.
Create a flood depth map from a single surface water extent map and
a HAND image. The HAND image must be pixel-aligned to the surface water extent map.
The the surface water extent map should be a byte GeoTIFF indicating water (true) and
not water (false)
Known perennial Global Surface-water data are produced under the Copernicus Programme (Pekel et al., 2016),
and are included with surface-water detection maps when generating the flood depth product.
Flood depth maps are estimated using one of the approaches:
*Iterative: (Default) Basin hopping optimization method matches flooded areas to flood depth
estimates given by the HAND layer. This is the most accurate method but also the
most time-intensive.
*Normalized Median Absolute Deviation (nmad): Uses a median operator to estimate
the variation to increase robustness in the presence of outliers.
*Logstat: Calculates the mean and standard deviation of HAND heights in the logarithmic
domain to improve robustness for very non-Gaussian data distributions.
*Numpy: Calculates statistics on a linear scale. Least robust to outliers and non-Gaussian
distributions.
Args:
out_raster: Flood depth GeoTIFF to create
water_raster: Surface water extent GeoTIFF
hand_raster: Height Above Nearest Drainage (HAND) GeoTIFF aligned to the surface water extent raster
estimator: Estimation approach for determining flood depth
water_level_sigma: Max water height used in logstat, nmad, and numpy estimations
known_water_threshold: Threshold for extracting the known water area in percent
iterative_bounds: Bounds on basin-hopping algorithm used in iterative estimation
References:
Jean-Francios Pekel, Andrew Cottam, Noel Gorelik, Alan S. Belward. 2016. <https://doi:10.1038/nature20584>
"""
info = gdal.Info(str(water_raster), format='json')
epsg = get_epsg_code(info)
geotransform = info['geoTransform']
hand_array = gdal.Open(str(hand_raster), gdal.GA_ReadOnly).ReadAsArray()
log.info('Fetching perennial flood data.')
known_water_mask = get_waterbody(info, threshold=known_water_threshold)
water_map = gdal.Open(water_raster).ReadAsArray()
flood_mask = np.bitwise_or(water_map, known_water_mask)
labeled_flood_mask, num_labels = ndimage.label(flood_mask)
object_slices = ndimage.find_objects(labeled_flood_mask)
log.info(f'Detected {num_labels} water bodies...')
flood_depth = np.zeros(flood_mask.shape)
for ll in range(1, num_labels): # Skip first, largest label.
slices = object_slices[ll - 1]
min0, max0 = slices[0].start, slices[0].stop
min1, max1 = slices[1].start, slices[1].stop
flood_window = labeled_flood_mask[min0:max0, min1:max1]
hand_window = hand_array[min0:max0, min1:max1]
water_height = estimate_flood_depth(ll, hand_window, flood_window, estimator=estimator,
water_level_sigma=water_level_sigma, iterative_bounds=iterative_bounds)
flood_depth_window = flood_depth[min0:max0, min1:max1]
flood_depth_window[flood_window == ll] = water_height - hand_window[flood_window == ll]
flood_depth[flood_depth < 0] = 0
write_cog(str(out_raster).replace('.tif', f'_{estimator}_WaterDepth.tif'), flood_depth, transform=geotransform,
epsg_code=epsg, dtype=gdal.GDT_Float64, nodata_value=False)
write_cog(str(out_raster).replace('.tif', f'_{estimator}_FloodMask.tif'), flood_mask, transform=geotransform,
epsg_code=epsg, dtype=gdal.GDT_Byte, nodata_value=False)
flood_mask[known_water_mask] = 0
flood_depth[np.bitwise_not(flood_mask)] = 0
write_cog(str(out_raster).replace('.tif', f'_{estimator}_FloodDepth.tif'), flood_depth, transform=geotransform,
epsg_code=epsg, dtype=gdal.GDT_Float64, nodata_value=False)
hand
special
¶
calculate
¶
Calculate Height Above Nearest Drainage (HAND) from the Copernicus GLO-30 DEM
calculate_hand(dem_array, dem_affine, dem_crs, basin_mask, acc_thresh=100)
¶
Calculate the Height Above Nearest Drainage (HAND)
Calculate the Height Above Nearest Drainage (HAND) using pySHEDS library. Because HAND is tied to watershed boundaries (hydrobasins), clipped/cut basins will produce weird edge effects, and incomplete basins should be masked out. For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins
This involves:
* Filling depressions (regions of cells lower than their surrounding neighbors)
in the Digital Elevation Model (DEM)
* Resolving un-drainable flats
* Determine the flow direction using the ESRI D8 routing scheme
* Determine flow accumulation (number of upstream cells)
* Create a drainage mask using the accumulation threshold acc_thresh
* Calculating HAND
In the HAND calculation, NaNs inside the basin filled using fill_nan
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dem_array |
DEM to calculate HAND for |
required | |
dem_crs |
CRS |
DEM Coordinate Reference System (CRS) |
required |
dem_affine |
Affine |
DEM Affine geotransform |
required |
basin_mask |
Array of booleans indicating wither an element should be masked out (Ă la Numpy Masked Arrays: https://numpy.org/doc/stable/reference/maskedarray.generic.html#what-is-a-masked-array) |
required | |
acc_thresh |
Optional[int] |
Accumulation threshold for determining the drainage mask.
If |
100 |
Source code in asf_tools/hand/calculate.py
def calculate_hand(dem_array, dem_affine: rasterio.Affine, dem_crs: rasterio.crs.CRS, basin_mask,
acc_thresh: Optional[int] = 100):
"""Calculate the Height Above Nearest Drainage (HAND)
Calculate the Height Above Nearest Drainage (HAND) using pySHEDS library. Because HAND
is tied to watershed boundaries (hydrobasins), clipped/cut basins will produce weird edge
effects, and incomplete basins should be masked out. For watershed boundaries,
see: https://www.hydrosheds.org/page/hydrobasins
This involves:
* Filling depressions (regions of cells lower than their surrounding neighbors)
in the Digital Elevation Model (DEM)
* Resolving un-drainable flats
* Determine the flow direction using the ESRI D8 routing scheme
* Determine flow accumulation (number of upstream cells)
* Create a drainage mask using the accumulation threshold `acc_thresh`
* Calculating HAND
In the HAND calculation, NaNs inside the basin filled using `fill_nan`
Args:
dem_array: DEM to calculate HAND for
dem_crs: DEM Coordinate Reference System (CRS)
dem_affine: DEM Affine geotransform
basin_mask: Array of booleans indicating wither an element should be masked out (Ă la Numpy Masked Arrays:
https://numpy.org/doc/stable/reference/maskedarray.generic.html#what-is-a-masked-array)
acc_thresh: Accumulation threshold for determining the drainage mask.
If `None`, the mean accumulation value is used
"""
grid = Pgrid()
grid.add_gridded_data(dem_array, data_name='dem', affine=dem_affine, crs=dem_crs.to_dict(), mask=~basin_mask)
log.info('Filling depressions')
grid.fill_depressions('dem', out_name='flooded_dem')
if np.isnan(grid.flooded_dem).any():
log.debug('NaNs encountered in flooded DEM; filling.')
grid.flooded_dem = fill_nan(grid.flooded_dem)
log.info('Resolving flats')
grid.resolve_flats('flooded_dem', out_name='inflated_dem')
if np.isnan(grid.inflated_dem).any():
log.debug('NaNs encountered in inflated DEM; replacing NaNs with original DEM values')
grid.inflated_dem[np.isnan(grid.inflated_dem)] = dem_array[np.isnan(grid.inflated_dem)]
log.info('Obtaining flow direction')
grid.flowdir(data='inflated_dem', out_name='dir', apply_mask=True)
if np.isnan(grid.dir).any():
log.debug('NaNs encountered in flow direction; filling.')
grid.dir = fill_nan(grid.dir)
log.info('Calculating flow accumulation')
grid.accumulation(data='dir', out_name='acc')
if np.isnan(grid.acc).any():
log.debug('NaNs encountered in accumulation; filling.')
grid.acc = fill_nan(grid.acc)
if acc_thresh is None:
acc_thresh = grid.acc.mean()
log.info(f'Calculating HAND using accumulation threshold of {acc_thresh}')
hand = grid.compute_hand('dir', 'inflated_dem', grid.acc > acc_thresh, inplace=False)
if np.isnan(hand).any():
log.debug('NaNs encountered in HAND; filling.')
hand = fill_nan(hand)
# ensure non-basin is masked after fill_nan
hand[basin_mask] = np.nan
return hand
calculate_hand_for_basins(out_raster, geometries, dem_file)
¶
Calculate the Height Above Nearest Drainage (HAND) for watershed boundaries (hydrobasins).
For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_raster |
Union[str, pathlib.Path] |
HAND GeoTIFF to create |
required |
geometries |
GeometryCollection |
watershed boundary (hydrobasin) polygons to calculate HAND over |
required |
dem_file |
Union[str, pathlib.Path] |
DEM raster covering (containing) |
required |
Source code in asf_tools/hand/calculate.py
def calculate_hand_for_basins(out_raster: Union[str, Path], geometries: GeometryCollection,
dem_file: Union[str, Path]):
"""Calculate the Height Above Nearest Drainage (HAND) for watershed boundaries (hydrobasins).
For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins
Args:
out_raster: HAND GeoTIFF to create
geometries: watershed boundary (hydrobasin) polygons to calculate HAND over
dem_file: DEM raster covering (containing) `geometries`
"""
with rasterio.open(dem_file) as src:
basin_mask, basin_affine_tf, basin_window = rasterio.mask.raster_geometry_mask(
src, geometries, all_touched=True, crop=True, pad=True, pad_width=1
)
basin_array = src.read(1, window=basin_window)
hand = calculate_hand(basin_array, basin_affine_tf, src.crs, basin_mask)
write_cog(str(out_raster), hand, transform=basin_affine_tf.to_gdal(), epsg_code=src.crs.to_epsg())
fill_nan(array)
¶
Replace NaNs with values interpolated from their neighbors
Replace NaNs with values interpolated from their neighbors using a 2D Gaussian kernel, see: https://docs.astropy.org/en/stable/convolution/#using-astropy-s-convolution-to-replace-bad-data
Source code in asf_tools/hand/calculate.py
def fill_nan(array: np.ndarray) -> np.ndarray:
"""Replace NaNs with values interpolated from their neighbors
Replace NaNs with values interpolated from their neighbors using a 2D Gaussian
kernel, see: https://docs.astropy.org/en/stable/convolution/#using-astropy-s-convolution-to-replace-bad-data
"""
kernel = astropy.convolution.Gaussian2DKernel(x_stddev=3) # kernel x_size=8*stddev
with warnings.catch_warnings():
warnings.simplefilter("ignore")
array = astropy.convolution.interpolate_replace_nans(
array, kernel, convolve=astropy.convolution.convolve
)
return array
make_copernicus_hand(out_raster, vector_file)
¶
Copernicus GLO-30 Height Above Nearest Drainage (HAND)
Make a Height Above Nearest Drainage (HAND) GeoTIFF from the Copernicus GLO-30 DEM covering the watershed boundaries (hydrobasins) defined in a vector file.
For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_raster |
Union[str, pathlib.Path] |
HAND GeoTIFF to create |
required |
vector_file |
Union[str, pathlib.Path] |
Vector file of watershed boundary (hydrobasin) polygons to calculate HAND over |
required |
Source code in asf_tools/hand/calculate.py
def make_copernicus_hand(out_raster: Union[str, Path], vector_file: Union[str, Path]):
"""Copernicus GLO-30 Height Above Nearest Drainage (HAND)
Make a Height Above Nearest Drainage (HAND) GeoTIFF from the Copernicus GLO-30 DEM
covering the watershed boundaries (hydrobasins) defined in a vector file.
For watershed boundaries, see: https://www.hydrosheds.org/page/hydrobasins
Args:
out_raster: HAND GeoTIFF to create
vector_file: Vector file of watershed boundary (hydrobasin) polygons to calculate HAND over
"""
with fiona.open(vector_file) as vds:
geometries = GeometryCollection([shape(feature['geometry']) for feature in vds])
with NamedTemporaryFile(suffix='.vrt', delete=False) as dem_vrt:
prepare_dem_vrt(dem_vrt.name, geometries)
calculate_hand_for_basins(out_raster, geometries, dem_vrt.name)
prepare
¶
Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry
prepare_hand_for_raster(hand_raster, source_raster, resampling_method='lanczos')
¶
Create a HAND raster pixel-aligned to a source raster
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hand_raster |
Union[str, pathlib.Path] |
Path for the output HAND raster |
required |
source_raster |
Union[str, pathlib.Path] |
Path for the source raster |
required |
resampling_method |
str |
Name of the resampling method to use. For available methods, see: https://gdal.org/programs/gdalwarp.html#cmdoption-gdalwarp-r |
'lanczos' |
Source code in asf_tools/hand/prepare.py
def prepare_hand_for_raster(hand_raster: Union[str, Path], source_raster: Union[str, Path],
resampling_method: str = 'lanczos'):
"""Create a HAND raster pixel-aligned to a source raster
Args:
hand_raster: Path for the output HAND raster
source_raster: Path for the source raster
resampling_method: Name of the resampling method to use. For available methods, see:
https://gdal.org/programs/gdalwarp.html#cmdoption-gdalwarp-r
"""
info = gdal.Info(str(source_raster), format='json')
hand_geometry = shape(info['wgs84Extent'])
hand_bounds = [info['cornerCoordinates']['upperLeft'][0],
info['cornerCoordinates']['lowerRight'][1],
info['cornerCoordinates']['lowerRight'][0],
info['cornerCoordinates']['upperLeft'][1]]
with NamedTemporaryFile(suffix='.vrt', delete=False) as hand_vrt:
prepare_hand_vrt(hand_vrt.name, hand_geometry)
gdal.Warp(str(hand_raster), hand_vrt.name, dstSRS=f'EPSG:{get_epsg_code(info)}',
outputBounds=hand_bounds, width=info['size'][0], height=info['size'][1],
resampleAlg=Resampling[resampling_method].value)
prepare_hand_vrt(vrt, geometry)
¶
Prepare a HAND mosaic VRT covering a given geometry
Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry. The Height Above Nearest Drainage (HAND) mosaic is assembled from the HAND tiles that intersect the geometry, using a HAND derived from the Copernicus GLO-30 DEM.
Note: asf_tools
does not currently support geometries that cross the antimeridian.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
vrt |
Union[str, pathlib.Path] |
Path for the output VRT file |
required |
geometry |
Union[osgeo.ogr.Geometry, shapely.geometry.base.BaseGeometry] |
Geometry in EPSG:4326 (lon/lat) projection for which to prepare a HAND mosaic |
required |
Source code in asf_tools/hand/prepare.py
def prepare_hand_vrt(vrt: Union[str, Path], geometry: Union[ogr.Geometry, BaseGeometry]):
"""Prepare a HAND mosaic VRT covering a given geometry
Prepare a Height Above Nearest Drainage (HAND) virtual raster (VRT) covering a given geometry.
The Height Above Nearest Drainage (HAND) mosaic is assembled from the HAND tiles that intersect
the geometry, using a HAND derived from the Copernicus GLO-30 DEM.
Note: `asf_tools` does not currently support geometries that cross the antimeridian.
Args:
vrt: Path for the output VRT file
geometry: Geometry in EPSG:4326 (lon/lat) projection for which to prepare a HAND mosaic
"""
with GDALConfigManager(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR'):
if isinstance(geometry, BaseGeometry):
geometry = ogr.CreateGeometryFromWkb(geometry.wkb)
min_lon, max_lon, _, _ = geometry.GetEnvelope()
if min_lon < -160. and max_lon > 160.:
raise ValueError(f'asf_tools does not currently support geometries that cross the antimeridian: {geometry}')
tile_features = vector.get_features(HAND_GEOJSON)
if not vector.get_property_values_for_intersecting_features(geometry, tile_features):
raise ValueError(f'Copernicus GLO-30 HAND does not intersect this geometry: {geometry}')
hand_file_paths = vector.intersecting_feature_properties(geometry, tile_features, 'file_path')
gdal.BuildVRT(str(vrt), hand_file_paths)
raster
¶
convert_scale(array, in_scale, out_scale)
¶
Convert calibrated raster scale between db, amplitude and power
Source code in asf_tools/raster.py
def convert_scale(array: Union[np.ndarray, np.ma.MaskedArray], in_scale: Literal['db', 'amplitude', 'power'],
out_scale: Literal['db', 'amplitude', 'power']) -> Union[np.ndarray, np.ma.MaskedArray]:
"""Convert calibrated raster scale between db, amplitude and power"""
if in_scale == out_scale:
warnings.warn(f'Nothing to do! {in_scale} is same as {out_scale}.')
return array
log10 = np.ma.log10 if isinstance(array, np.ma.MaskedArray) else np.log10
if in_scale == 'db':
if out_scale == 'power':
return 10 ** (array / 10)
if out_scale == 'amplitude':
return 10 ** (array / 20)
if in_scale == 'amplitude':
if out_scale == 'power':
return array ** 2
if out_scale == 'db':
return 10 * log10(array ** 2)
if in_scale == 'power':
if out_scale == 'amplitude':
return np.sqrt(array)
if out_scale == 'db':
return 10 * log10(array)
raise ValueError(f'Cannot convert raster of scale {in_scale} to {out_scale}')
read_as_masked_array(raster, band=1)
¶
Reads data from a raster image into memory, masking invalid and NoData values
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raster |
Union[str, pathlib.Path] |
The file path to a raster image |
required |
band |
int |
The raster band to read |
1 |
Returns:
Type | Description |
---|---|
data |
The raster pixel data as a numpy MaskedArray |
Source code in asf_tools/raster.py
def read_as_masked_array(raster: Union[str, Path], band: int = 1) -> np.ma.MaskedArray:
"""Reads data from a raster image into memory, masking invalid and NoData values
Args:
raster: The file path to a raster image
band: The raster band to read
Returns:
data: The raster pixel data as a numpy MaskedArray
"""
log.debug(f'Reading raster values from {raster}')
ds = gdal.Open(str(raster))
band = ds.GetRasterBand(band)
data = np.ma.masked_invalid(band.ReadAsArray())
nodata = band.GetNoDataValue()
if nodata is not None:
return np.ma.masked_values(data, nodata)
return data
threshold
¶
expectation_maximization_threshold(tile, number_of_classes=3)
¶
Water threshold Calculation using a multi-mode Expectation Maximization Approach
Thresholding works best when backscatter tiles are provided on a decibel scale to get Gaussian distribution that is scaled to a range of 0-255, and performed on a small tile that is likely to have a transition between water and not water.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tile |
ndarray |
array of backscatter values for a tile from an RTC raster |
required |
number_of_classes |
int |
classify the tile into this many classes. Typically, three classes capture: (1) urban and bright slopes, (2) average brightness farmland, and (3) water, as is often seen in the US Midwest. |
3 |
Returns:
Type | Description |
---|---|
threshold |
threshold value that can be used to create a water extent map |
Source code in asf_tools/threshold.py
def expectation_maximization_threshold(tile: np.ndarray, number_of_classes: int = 3) -> float:
"""Water threshold Calculation using a multi-mode Expectation Maximization Approach
Thresholding works best when backscatter tiles are provided on a decibel scale
to get Gaussian distribution that is scaled to a range of 0-255, and performed
on a small tile that is likely to have a transition between water and not water.
Args:
tile: array of backscatter values for a tile from an RTC raster
number_of_classes: classify the tile into this many classes. Typically, three
classes capture: (1) urban and bright slopes, (2) average brightness farmland,
and (3) water, as is often seen in the US Midwest.
Returns:
threshold: threshold value that can be used to create a water extent map
"""
image_copy = tile.copy()
image_copy2 = np.ma.filled(tile.astype(float), np.nan) # needed for valid posterior_lookup keys
image = tile.flatten()
minimum = np.amin(image)
image = image - minimum + 1
maximum = np.amax(image)
histogram = _make_histogram(image)
nonzero_indices = np.nonzero(histogram)[0]
histogram = histogram[nonzero_indices]
histogram = histogram.flatten()
class_means = (
(np.arange(number_of_classes) + 1) * maximum /
(number_of_classes + 1)
)
class_variances = np.ones(number_of_classes) * maximum
class_proportions = np.ones(number_of_classes) * 1 / number_of_classes
sml = np.mean(np.diff(nonzero_indices)) / 1000
iteration = 0
while True:
class_likelihood = _make_distribution(
class_means, class_variances, class_proportions, nonzero_indices
)
sum_likelihood = np.sum(class_likelihood, 1) + np.finfo(
class_likelihood[0][0]).eps
log_likelihood = np.sum(histogram * np.log(sum_likelihood))
for j in range(0, number_of_classes):
class_posterior_probability = (
histogram * class_likelihood[:, j] / sum_likelihood
)
class_proportions[j] = np.sum(class_posterior_probability)
class_means[j] = (
np.sum(nonzero_indices * class_posterior_probability)
/ class_proportions[j]
)
vr = (nonzero_indices - class_means[j])
class_variances[j] = (
np.sum(vr * vr * class_posterior_probability)
/ class_proportions[j] + sml
)
del class_posterior_probability, vr
class_proportions += 1e-3
class_proportions /= np.sum(class_proportions)
class_likelihood = _make_distribution(
class_means, class_variances, class_proportions, nonzero_indices
)
sum_likelihood = np.sum(class_likelihood, 1) + np.finfo(
class_likelihood[0, 0]).eps
del class_likelihood
new_log_likelihood = np.sum(histogram * np.log(sum_likelihood))
del sum_likelihood
if (new_log_likelihood - log_likelihood) < 0.000001:
break
iteration += 1
del log_likelihood, new_log_likelihood
class_means = class_means + minimum - 1
s = image_copy.shape
posterior = np.zeros((s[0], s[1], number_of_classes))
posterior_lookup = dict()
for i in range(0, s[0]):
for j in range(0, s[1]):
pixel_val = image_copy2[i, j]
if pixel_val in posterior_lookup:
for n in range(0, number_of_classes):
posterior[i, j, n] = posterior_lookup[pixel_val][n]
else:
posterior_lookup.update({pixel_val: [0] * number_of_classes})
for n in range(0, number_of_classes):
x = _make_distribution(
class_means[n], class_variances[n], class_proportions[n],
image_copy[i, j]
)
posterior[i, j, n] = x * class_proportions[n]
posterior_lookup[pixel_val][n] = posterior[i, j, n]
sorti = np.argsort(class_means)
xvec = np.arange(class_means[sorti[0]], class_means[sorti[1]], step=.05)
x1 = _make_distribution(class_means[sorti[0]], class_variances[sorti[0]], class_proportions[sorti[0]], xvec)
x2 = _make_distribution(class_means[sorti[1]], class_variances[sorti[1]], class_proportions[sorti[1]], xvec)
dx = np.abs(x1 - x2)
return xvec[np.argmin(dx)]
tile
¶
tile_array(array, tile_shape=(200, 200), pad_value=None)
¶
Tile a 2D numpy array
Turn a 2D numpy array like: >>> array = [[0, 0, 1, 1], ... [0, 0, 1, 1], ... [2, 2, 3, 3], ... [2, 2, 3, 3]] >>> array.shape (4, 4)
into a tiled array like: >>> tiles = tiled_array(array, 2, 2) >>> print(tiles) [[[0, 0], [0, 0]], [[1, 1], [1, 1]], [[2, 2], [2, 2]], [[3, 3], [3, 3]]] >>> tiles.shape (4, 2, 2)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
array |
Union[numpy.ndarray, numpy.ma.core.MaskedArray] |
2D array to tile |
required |
tile_shape |
Tuple[int, int] |
the shape of each tile |
(200, 200) |
pad_value |
float |
right-bottom pad |
None |
Returns:
Type | Description |
---|---|
Union[numpy.ndarray, numpy.ma.core.MaskedArray] |
the tiled array |
Source code in asf_tools/tile.py
def tile_array(array: Union[np.ndarray, np.ma.MaskedArray], tile_shape: Tuple[int, int] = (200, 200),
pad_value: float = None) -> Union[np.ndarray, np.ma.MaskedArray]:
"""Tile a 2D numpy array
Turn a 2D numpy array like:
>>> array = [[0, 0, 1, 1],
... [0, 0, 1, 1],
... [2, 2, 3, 3],
... [2, 2, 3, 3]]
>>> array.shape
(4, 4)
into a tiled array like:
>>> tiles = tiled_array(array, 2, 2)
>>> print(tiles)
[[[0, 0],
[0, 0]],
[[1, 1],
[1, 1]],
[[2, 2],
[2, 2]],
[[3, 3],
[3, 3]]]
>>> tiles.shape
(4, 2, 2)
Args:
array: 2D array to tile
tile_shape: the shape of each tile
pad_value: right-bottom pad `a` with `pad` as needed so `a` is evenly divisible into tiles
Returns:
the tiled array
"""
array_rows, array_columns = array.shape
tile_rows, tile_columns = tile_shape
# CREDIT: https://twitter.com/LizzUltee/status/1379508448262512641
rpad = -array_rows % tile_rows
cpad = -array_columns % tile_columns
if (rpad or cpad) and pad_value is None:
raise ValueError(f'Cannot evenly tile a {array.shape} array into ({tile_rows},{tile_columns}) tiles')
if rpad or cpad:
padded_array = np.pad(array, ((0, rpad), (0, cpad)), constant_values=pad_value)
if isinstance(array, np.ma.MaskedArray):
mask = np.pad(array.mask, ((0, rpad), (0, cpad)), constant_values=True)
padded_array = np.ma.MaskedArray(padded_array, mask=mask)
else:
padded_array = array
tile_list = []
for rows in np.vsplit(padded_array, range(tile_rows, array_rows, tile_rows)):
tile_list.extend(np.hsplit(rows, range(tile_columns, array_columns, tile_columns)))
dstack = np.ma.dstack if isinstance(array, np.ma.MaskedArray) else np.dstack
tiled = np.moveaxis(dstack(tile_list), -1, 0)
return tiled
untile_array(tiled_array, array_shape)
¶
Untile a tiled array into a 2D numpy array
This is the reverse of tile_array
and will turn a tiled array like:
>>> tiled_array = [[[0,0],
... [0,0]],
... [[1,1],
... [1,1]],
... [[2,2],
... [2,2]],
... [[3,3],
... [3,3]]]
>>> tiled_array.shape
(4, 2, 2)
into a 2D array like: >>> array = untile_array(tiled_array) >>> print(array) [[0, 0, 1, 1], [0, 0, 1, 1], [2, 2, 3, 3], [2, 2, 3, 3]] >>> array.shape (4, 4)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tiled_array |
Union[numpy.ndarray, numpy.ma.core.MaskedArray] |
a tiled array |
required |
array_shape |
Tuple[int, int] |
shape to untile the array to. If array_shape's size is smaller
than the tiled array, |
required |
Returns:
Type | Description |
---|---|
Union[numpy.ndarray, numpy.ma.core.MaskedArray] |
the untiled array |
Source code in asf_tools/tile.py
def untile_array(tiled_array: Union[np.ndarray, np.ma.MaskedArray], array_shape: Tuple[int, int]) \
-> Union[np.ndarray, np.ma.MaskedArray]:
"""Untile a tiled array into a 2D numpy array
This is the reverse of `tile_array` and will turn a tiled array like:
>>> tiled_array = [[[0,0],
... [0,0]],
... [[1,1],
... [1,1]],
... [[2,2],
... [2,2]],
... [[3,3],
... [3,3]]]
>>> tiled_array.shape
(4, 2, 2)
into a 2D array like:
>>> array = untile_array(tiled_array)
>>> print(array)
[[0, 0, 1, 1],
[0, 0, 1, 1],
[2, 2, 3, 3],
[2, 2, 3, 3]]
>>> array.shape
(4, 4)
Args:
tiled_array: a tiled array
array_shape: shape to untile the array to. If array_shape's size is smaller
than the tiled array, `untile_array` will subset the tiled array assuming
bottom right padding was added when tiling.
Returns:
the untiled array
"""
_, tile_rows, tile_columns = tiled_array.shape
array_rows, array_columns = array_shape
untiled_rows = int(np.ceil(array_rows / tile_rows))
untiled_columns = int(np.ceil(array_columns / tile_columns))
untiled = np.zeros((untiled_rows*tile_rows, untiled_columns*tile_columns), dtype=tiled_array.dtype)
if (array_size := array_rows * array_columns) > tiled_array.size:
raise ValueError(
f'array_shape {array_shape} will result in an array bigger than the tiled array:'
f' {array_size} > {tiled_array.size}'
)
for ii in range(untiled_rows):
for jj in range(untiled_columns):
untiled[ii*tile_rows:(ii+1)*tile_rows, jj*tile_columns:(jj+1)*tile_columns] = \
tiled_array[ii * untiled_columns + jj, :, :]
if isinstance(tiled_array, np.ma.MaskedArray):
untiled_mask = untile_array(tiled_array.mask, untiled.shape)
untiled = np.ma.MaskedArray(untiled, mask=untiled_mask)
return untiled[:array_rows, :array_columns]
util
¶
GDALConfigManager
¶
Context manager for setting GDAL config options temporarily
Source code in asf_tools/util.py
class GDALConfigManager:
"""Context manager for setting GDAL config options temporarily"""
def __init__(self, **options):
"""
Args:
**options: GDAL Config `option=value` keyword arguments.
"""
self.options = options.copy()
self._previous_options = {}
def __enter__(self):
for key in self.options:
self._previous_options[key] = gdal.GetConfigOption(key)
for key, value in self.options.items():
gdal.SetConfigOption(key, value)
def __exit__(self, exc_type, exc_val, exc_tb):
for key, value in self._previous_options.items():
gdal.SetConfigOption(key, value)
__init__(self, **options)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**options |
GDAL Config |
{} |
Source code in asf_tools/util.py
def __init__(self, **options):
"""
Args:
**options: GDAL Config `option=value` keyword arguments.
"""
self.options = options.copy()
self._previous_options = {}
water_map
¶
Generate surface water maps from Sentinel-1 RTC products
Create a surface water extent map from a dual-pol Sentinel-1 RTC product and a HAND image. The HAND image must be pixel-aligned (same extent and size) to the RTC images. The water extent maps are created using an adaptive Expectation Maximization thresholding approach and refined using Fuzzy Logic.
make_water_map(out_raster, vv_raster, vh_raster, hand_raster=None, tile_shape=(100, 100), max_vv_threshold=-15.5, max_vh_threshold=-23.0, hand_threshold=15.0, hand_fraction=0.8, membership_threshold=0.45)
¶
Creates a surface water extent map from a Sentinel-1 RTC product
Create a surface water extent map from a dual-pol Sentinel-1 RTC product and a HAND image. The HAND image must be pixel-aligned (same extent and size) to the RTC images. The water extent maps are created using an adaptive Expectation Maximization thresholding approach and refined with Fuzzy Logic.
The input images are broken into a set of corresponding tiles with a shape of
tile_shape
, and a set of tiles are selected from the VH RTC
image that contain water boundaries to determine an appropriate water threshold.
Candidate tiles must meet these criteria:
* hand_fraction
of pixels within a tile must have HAND pixel values lower
than hand_threshold
* The median backscatter value for the tile must be lower than an average tiles'
backscatter values
* The tile must have a high variance -- high variance is considered initially to
be a variance in the 95th percentile of the tile variances, but progressively
relaxed to the 5th percentile if there not at least 5 candidate tiles.
The 5 VH tiles with the highest variance are selected for thresholding and a
water threshold value is determined using an Expectation Maximization approach.
If there were not enough candidate tiles or the threshold is too high,
max_vh_threshold
and/or max_vv_threshold
will be used instead.
From the initial threshold-based water extent maps, Fuzzy Logic is used to remove spurious false detections and improve the water extent map quality. The fuzzy logic uses these indicators for the presence of water: * radar cross section in a pixel relative to the determined detection threshold * the height above nearest drainage (HAND) * the surface slope, which is derived from the HAND data * the size of the detected water feature
For each indicator, a Z-shaped activation function is used to determine pixel membership.
The membership maps are combined to form the final water extent map. Pixels classified
as water pixels will:
* have non-zero membership in all of the indicators, and
* have an average membership above the membership_threshold
value.
Finally, the VV and VH water masks will be combined to include all water pixels
from both masks, and the combined water map will be written to out_raster
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_raster |
Union[str, pathlib.Path] |
Water map GeoTIFF to create |
required |
vv_raster |
Union[str, pathlib.Path] |
Sentinel-1 RTC GeoTIFF, in power scale, with VV polarization |
required |
vh_raster |
Union[str, pathlib.Path] |
Sentinel-1 RTC GeoTIFF, in power scale, with VH polarization |
required |
hand_raster |
Union[str, pathlib.Path] |
Height Above Nearest Drainage (HAND) GeoTIFF aligned to the RTC rasters |
None |
tile_shape |
Tuple[int, int] |
shape (height, width) in pixels to tile the image to |
(100, 100) |
max_vv_threshold |
float |
Maximum threshold value to use for |
-15.5 |
max_vh_threshold |
float |
Maximum threshold value to use for |
-23.0 |
hand_threshold |
float |
The maximum height above nearest drainage in meters to consider a pixel valid |
15.0 |
hand_fraction |
float |
The minimum fraction of valid HAND pixels required in a tile for thresholding |
0.8 |
membership_threshold |
float |
The average membership to the fuzzy indicators required for a water pixel |
0.45 |
Source code in asf_tools/water_map.py
def make_water_map(out_raster: Union[str, Path], vv_raster: Union[str, Path], vh_raster: Union[str, Path],
hand_raster: Optional[Union[str, Path]] = None, tile_shape: Tuple[int, int] = (100, 100),
max_vv_threshold: float = -15.5, max_vh_threshold: float = -23.0,
hand_threshold: float = 15., hand_fraction: float = 0.8, membership_threshold: float = 0.45):
"""Creates a surface water extent map from a Sentinel-1 RTC product
Create a surface water extent map from a dual-pol Sentinel-1 RTC product and
a HAND image. The HAND image must be pixel-aligned (same extent and size) to
the RTC images. The water extent maps are created using an adaptive Expectation
Maximization thresholding approach and refined with Fuzzy Logic.
The input images are broken into a set of corresponding tiles with a shape of
`tile_shape`, and a set of tiles are selected from the VH RTC
image that contain water boundaries to determine an appropriate water threshold.
Candidate tiles must meet these criteria:
* `hand_fraction` of pixels within a tile must have HAND pixel values lower
than `hand_threshold`
* The median backscatter value for the tile must be lower than an average tiles'
backscatter values
* The tile must have a high variance -- high variance is considered initially to
be a variance in the 95th percentile of the tile variances, but progressively
relaxed to the 5th percentile if there not at least 5 candidate tiles.
The 5 VH tiles with the highest variance are selected for thresholding and a
water threshold value is determined using an Expectation Maximization approach.
If there were not enough candidate tiles or the threshold is too high,
`max_vh_threshold` and/or `max_vv_threshold` will be used instead.
From the initial threshold-based water extent maps, Fuzzy Logic is used to remove
spurious false detections and improve the water extent map quality. The fuzzy logic
uses these indicators for the presence of water:
* radar cross section in a pixel relative to the determined detection threshold
* the height above nearest drainage (HAND)
* the surface slope, which is derived from the HAND data
* the size of the detected water feature
For each indicator, a Z-shaped activation function is used to determine pixel membership.
The membership maps are combined to form the final water extent map. Pixels classified
as water pixels will:
* have non-zero membership in all of the indicators, and
* have an average membership above the `membership_threshold` value.
Finally, the VV and VH water masks will be combined to include all water pixels
from both masks, and the combined water map will be written to `out_raster`.
Args:
out_raster: Water map GeoTIFF to create
vv_raster: Sentinel-1 RTC GeoTIFF, in power scale, with VV polarization
vh_raster: Sentinel-1 RTC GeoTIFF, in power scale, with VH polarization
hand_raster: Height Above Nearest Drainage (HAND) GeoTIFF aligned to the RTC rasters
tile_shape: shape (height, width) in pixels to tile the image to
max_vv_threshold: Maximum threshold value to use for `vv_raster` in decibels (db)
max_vh_threshold: Maximum threshold value to use for `vh_raster` in decibels (db)
hand_threshold: The maximum height above nearest drainage in meters to consider
a pixel valid
hand_fraction: The minimum fraction of valid HAND pixels required in a tile for
thresholding
membership_threshold: The average membership to the fuzzy indicators required for a water pixel
"""
if tile_shape[0] % 2 or tile_shape[1] % 2:
raise ValueError(f'tile_shape {tile_shape} requires even values.')
info = gdal.Info(str(vh_raster), format='json')
out_transform = info['geoTransform']
out_epsg = get_epsg_code(info)
if hand_raster is None:
hand_raster = str(out_raster).replace('.tif', '_HAND.tif')
log.info(f'Extracting HAND data to: {hand_raster}')
prepare_hand_for_raster(hand_raster, vh_raster)
log.info(f'Determining HAND memberships from {hand_raster}')
hand_array = read_as_masked_array(hand_raster)
hand_tiles = tile_array(hand_array, tile_shape=tile_shape, pad_value=np.nan)
hand_candidates = select_hand_tiles(hand_tiles, hand_threshold, hand_fraction)
log.debug(f'Selected HAND tile candidates {hand_candidates}')
selected_tiles = None
water_extent_maps = []
for max_db_threshold, raster, pol in ((max_vh_threshold, vh_raster, 'VH'), (max_vv_threshold, vv_raster, 'VV')):
log.info(f'Creating initial {pol} water extent map from {raster}')
array = read_as_masked_array(raster)
tiles = tile_array(array, tile_shape=tile_shape, pad_value=0.)
# Masking less than zero only necessary for old HyP3/GAMMA products which sometimes returned negative powers
tiles = np.ma.masked_less_equal(tiles, 0.)
if selected_tiles is None:
selected_tiles = select_backscatter_tiles(tiles, hand_candidates)
log.info(f'Selected tiles {selected_tiles} from {raster}')
with np.testing.suppress_warnings() as sup:
sup.filter(RuntimeWarning) # invalid value and divide by zero encountered in log10
tiles = np.log10(tiles) + 30. # linear power scale -> Gaussian scale optimized for thresholding
max_gaussian_threshold = max_db_threshold / 10. + 30. # db -> Gaussian scale optimized for thresholding
if selected_tiles.size:
scaling = 256 / (np.mean(tiles) + 3 * np.std(tiles))
gaussian_threshold = determine_em_threshold(tiles[selected_tiles, :, :], scaling)
threshold_db = 10. * (gaussian_threshold - 30.)
log.info(f'Threshold determined to be {threshold_db} db')
if gaussian_threshold > max_gaussian_threshold:
log.warning(f'Threshold too high! Using maximum threshold {max_db_threshold} db')
gaussian_threshold = max_gaussian_threshold
else:
log.warning(f'Tile selection did not converge! using default threshold {max_db_threshold} db')
gaussian_threshold = max_gaussian_threshold
gaussian_array = untile_array(tiles, array.shape)
water_map = np.ma.masked_less_equal(gaussian_array, gaussian_threshold).mask
water_map &= ~array.mask
write_cog(str(out_raster).replace('.tif', f'_{pol}_initial.tif'), water_map, transform=out_transform,
epsg_code=out_epsg, dtype=gdal.GDT_Byte, nodata_value=False)
log.info(f'Refining initial {pol} water extent map using Fuzzy Logic')
array = np.ma.masked_where(~water_map, array)
gaussian_lower_limit = np.log10(np.ma.median(array)) + 30.
water_map = fuzzy_refinement(
water_map, gaussian_array, hand_array, pixel_size=out_transform[1],
gaussian_thresholds=(gaussian_lower_limit, gaussian_threshold), membership_threshold=membership_threshold
)
water_map &= ~array.mask
write_cog(str(out_raster).replace('.tif', f'_{pol}_fuzzy.tif'), water_map, transform=out_transform,
epsg_code=out_epsg, dtype=gdal.GDT_Byte, nodata_value=False)
water_extent_maps.append(water_map)
log.info('Combining Fuzzy VH and VV extent map')
combined_water_map = np.logical_or(*water_extent_maps)
combined_segments = measure.label(combined_water_map, connectivity=2)
combined_water_map = remove_small_segments(combined_segments)
write_cog(out_raster, combined_water_map, transform=out_transform,
epsg_code=out_epsg, dtype=gdal.GDT_Byte, nodata_value=False)