hyp3_sdk
v1.6.0 API Reference¶
A python wrapper around the HyP3 API
exceptions
¶
Errors and exceptions to raise when the SDK runs into problems
ASFSearchError (HyP3SDKError)
¶
Raise for errors when using the ASF Search module
Source code in hyp3_sdk/exceptions.py
class ASFSearchError(HyP3SDKError):
"""Raise for errors when using the ASF Search module"""
AuthenticationError (HyP3SDKError)
¶
Raise when authentication does not succeed
Source code in hyp3_sdk/exceptions.py
class AuthenticationError(HyP3SDKError):
"""Raise when authentication does not succeed"""
HyP3Error (HyP3SDKError)
¶
Raise for errors when using the HyP3 module
Source code in hyp3_sdk/exceptions.py
class HyP3Error(HyP3SDKError):
"""Raise for errors when using the HyP3 module"""
HyP3SDKError (Exception)
¶
Base Exception for the HyP3 SDK
Source code in hyp3_sdk/exceptions.py
class HyP3SDKError(Exception):
"""Base Exception for the HyP3 SDK"""
ServerError (HyP3SDKError)
¶
Raise when the HyP3 SDK encounters a server error
Source code in hyp3_sdk/exceptions.py
class ServerError(HyP3SDKError):
"""Raise when the HyP3 SDK encounters a server error"""
hyp3
¶
HyP3
¶
A python wrapper around the HyP3 API
Source code in hyp3_sdk/hyp3.py
class HyP3:
"""A python wrapper around the HyP3 API"""
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
prompt: bool = False):
"""
Args:
api_url: Address of the HyP3 API
username: Username for authenticating to `urs.earthdata.nasa.gov`.
Both username and password must be provided if either is provided.
password: Password for authenticating to `urs.earthdata.nasa.gov`.
Both username and password must be provided if either is provided.
prompt: Prompt for username and/or password interactively when they
are not provided as keyword parameters
"""
self.url = api_url
if username is None and prompt:
username = input('NASA Earthdata Login username: ')
if password is None and prompt:
password = getpass('NASA Earthdata Login password: ')
self.session = get_authenticated_session(username, password)
self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})
def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
status_code: Optional[str] = None, name: Optional[str] = None,
job_type: Optional[str] = None) -> Batch:
"""Gets a Batch of jobs from HyP3 matching the provided search criteria
Args:
start: only jobs submitted after given time
end: only jobs submitted before given time
status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
name: only jobs with this name
job_type: only jobs with this job_type
Returns:
A Batch object containing the found jobs
"""
params = {}
for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
param_value = locals().get(param_name)
if param_value is not None:
if isinstance(param_value, datetime):
if param_value.tzinfo is None:
param_value = param_value.replace(tzinfo=timezone.utc)
param_value = param_value.isoformat(timespec='seconds')
params[param_name] = param_value
response = self.session.get(urljoin(self.url, '/jobs'), params=params)
_raise_for_hyp3_status(response)
jobs = [Job.from_dict(job) for job in response.json()['jobs']]
while 'next' in response.json():
next_url = response.json()['next']
response = self.session.get(next_url)
_raise_for_hyp3_status(response)
jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])
return Batch(jobs)
def get_job_by_id(self, job_id: str) -> Job:
"""Get job by job ID
Args:
job_id: A job ID
Returns:
A Job object
"""
response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
_raise_for_hyp3_status(response)
return Job.from_dict(response.json())
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
interval: Union[int, float] = 60) -> Union[Batch, Job]:
"""Watch jobs until they complete
Args:
job_or_batch: A Batch or Job object of jobs to watch
timeout: How long to wait until exiting in seconds
interval: How often to check for updates in seconds
Returns:
A Batch or Job object with refreshed watched jobs
"""
raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')
@watch.register
def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
tqdm = get_tqdm_progress_bar()
iterations_until_timeout = math.ceil(timeout / interval)
bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
for ii in range(iterations_until_timeout):
batch = self.refresh(batch)
counts = batch._count_statuses()
complete = counts['SUCCEEDED'] + counts['FAILED']
progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
# to control n/total manually; update is n += value
progress_bar.n = complete
progress_bar.update(0)
if batch.complete():
return batch
time.sleep(interval)
raise HyP3Error(f'Timeout occurred while waiting for {batch}')
@watch.register
def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
tqdm = get_tqdm_progress_bar()
iterations_until_timeout = math.ceil(timeout / interval)
bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
for ii in range(iterations_until_timeout):
job = self.refresh(job)
progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
progress_bar.update(int(job.complete()))
if job.complete():
return job
time.sleep(interval)
raise HyP3Error(f'Timeout occurred while waiting for {job}')
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
"""Refresh each jobs' information
Args:
job_or_batch: A Batch of Job object to refresh
Returns:
A Batch or Job object with refreshed information
"""
raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')
@refresh.register
def _refresh_batch(self, batch: Batch):
jobs = []
for job in batch.jobs:
jobs.append(self.refresh(job))
return Batch(jobs)
@refresh.register
def _refresh_job(self, job: Job):
return self.get_job_by_id(job.job_id)
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
"""Submit a prepared job dictionary, or list of prepared job dictionaries
Args:
prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries
Returns:
A Batch object containing the submitted job(s)
"""
if isinstance(prepared_jobs, dict):
payload = {'jobs': [prepared_jobs]}
else:
payload = {'jobs': prepared_jobs}
response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
_raise_for_hyp3_status(response)
batch = Batch()
for job in response.json()['jobs']:
batch += Job.from_dict(job)
return batch
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
"""Submit an autoRIFT job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
Returns:
A Batch object containing the autoRIFT job
"""
job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
return self.submit_prepared_jobs(prepared_jobs=job_dict)
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
"""Submit an autoRIFT job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
Returns:
A dictionary containing the prepared autoRIFT job
"""
job_dict = {
'job_parameters': {'granules': [granule1, granule2]},
'job_type': 'AUTORIFT',
}
if name is not None:
job_dict['name'] = name
return job_dict
def submit_rtc_job(self,
granule: str,
name: Optional[str] = None,
dem_matching: bool = False,
include_dem: bool = False,
include_inc_map: bool = False,
include_rgb: bool = False,
include_scattering_area: bool = False,
radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
resolution: Literal[30] = 30,
scale: Literal['amplitude', 'power'] = 'power',
speckle_filter: bool = False,
dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
"""Submit an RTC job
Args:
granule: The granule (scene) to use
name: A name for the job
dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
include_dem: Include the DEM file in the product package
include_inc_map: Include the local incidence angle map in the product package
include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
(ignored for single-pol granules)
include_scattering_area: Include the scattering area in the product package
radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
projected into the look direction (gamma0)
resolution: Desired output pixel spacing in meters
scale: Scale of output image; either power or amplitude
speckle_filter: Apply an Enhanced Lee speckle filter
dem_name: Name of the DEM to use for processing. `copernicus` will use the Copernicus GLO-30 Public DEM,
while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.
Returns:
A Batch object containing the RTC job
"""
arguments = locals()
arguments.pop('self')
job_dict = self.prepare_rtc_job(**arguments)
return self.submit_prepared_jobs(prepared_jobs=job_dict)
@classmethod
def prepare_rtc_job(cls,
granule: str,
name: Optional[str] = None,
dem_matching: bool = False,
include_dem: bool = False,
include_inc_map: bool = False,
include_rgb: bool = False,
include_scattering_area: bool = False,
radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
resolution: Literal[30] = 30,
scale: Literal['amplitude', 'power'] = 'power',
speckle_filter: bool = False,
dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
"""Submit an RTC job
Args:
granule: The granule (scene) to use
name: A name for the job
dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
include_dem: Include the DEM file in the product package
include_inc_map: Include the local incidence angle map in the product package
include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
(ignored for single-pol granules)
include_scattering_area: Include the scattering area in the product package
radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
projected into the look direction (gamma0)
resolution: Desired output pixel spacing in meters
scale: Scale of output image; either power or amplitude
speckle_filter: Apply an Enhanced Lee speckle filter
dem_name: Name of the DEM to use for processing. `copernicus` will use the Copernicus GLO-30 Public DEM,
while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.
Returns:
A dictionary containing the prepared RTC job
"""
job_parameters = locals().copy()
for key in ['granule', 'name', 'cls']:
job_parameters.pop(key, None)
job_dict = {
'job_parameters': {'granules': [granule], **job_parameters},
'job_type': 'RTC_GAMMA',
}
if name is not None:
job_dict['name'] = name
return job_dict
def submit_insar_job(self,
granule1: str,
granule2: str,
name: Optional[str] = None,
include_look_vectors: bool = False,
include_los_displacement: bool = False,
include_inc_map: bool = False,
looks: Literal['20x4', '10x2'] = '20x4',
include_dem: bool = False,
include_wrapped_phase: bool = False,
apply_water_mask: bool = False,
include_displacement_maps: bool = False) -> Batch:
"""Submit an InSAR job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
include_look_vectors: Include the look vector theta and phi files in the product package
include_los_displacement: Include a GeoTIFF in the product package containing displacement values
along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
`include_displacement_maps`, and will be removed in a future release.
include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
looks: Number of looks to take in range and azimuth
include_dem: Include the digital elevation model GeoTIFF in the product package
include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
as invalid for phase unwrapping
include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
Returns:
A Batch object containing the InSAR job
"""
arguments = locals().copy()
arguments.pop('self')
job_dict = self.prepare_insar_job(**arguments)
return self.submit_prepared_jobs(prepared_jobs=job_dict)
@classmethod
def prepare_insar_job(cls,
granule1: str,
granule2: str,
name: Optional[str] = None,
include_look_vectors: bool = False,
include_los_displacement: bool = False,
include_inc_map: bool = False,
looks: Literal['20x4', '10x2'] = '20x4',
include_dem: bool = False,
include_wrapped_phase: bool = False,
apply_water_mask: bool = False,
include_displacement_maps: bool = False) -> dict:
"""Submit an InSAR job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
include_look_vectors: Include the look vector theta and phi files in the product package
include_los_displacement: Include a GeoTIFF in the product package containing displacement values
along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
`include_displacement_maps`, and will be removed in a future release.
include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
looks: Number of looks to take in range and azimuth
include_dem: Include the digital elevation model GeoTIFF in the product package
include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
as invalid for phase unwrapping
include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
Returns:
A dictionary containing the prepared InSAR job
"""
if include_los_displacement:
warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
'include_displacement_maps, and will be removed in a future release.', FutureWarning)
job_parameters = locals().copy()
for key in ['cls', 'granule1', 'granule2', 'name']:
job_parameters.pop(key)
job_dict = {
'job_parameters': {'granules': [granule1, granule2], **job_parameters},
'job_type': 'INSAR_GAMMA',
}
if name is not None:
job_dict['name'] = name
return job_dict
def my_info(self) -> dict:
"""
Returns:
Your user information
"""
response = self.session.get(urljoin(self.url, '/user'))
_raise_for_hyp3_status(response)
return response.json()
def check_quota(self) -> Optional[int]:
"""
Returns:
The number of jobs left in your quota, or None if you have no quota
"""
info = self.my_info()
return info['quota']['remaining']
__init__(self, api_url='https://hyp3-api.asf.alaska.edu', username=None, password=None, prompt=False)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
api_url |
str |
Address of the HyP3 API |
'https://hyp3-api.asf.alaska.edu' |
username |
Optional[str] |
Username for authenticating to |
None |
password |
Optional[str] |
Password for authenticating to |
None |
prompt |
bool |
Prompt for username and/or password interactively when they are not provided as keyword parameters |
False |
Source code in hyp3_sdk/hyp3.py
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
prompt: bool = False):
"""
Args:
api_url: Address of the HyP3 API
username: Username for authenticating to `urs.earthdata.nasa.gov`.
Both username and password must be provided if either is provided.
password: Password for authenticating to `urs.earthdata.nasa.gov`.
Both username and password must be provided if either is provided.
prompt: Prompt for username and/or password interactively when they
are not provided as keyword parameters
"""
self.url = api_url
if username is None and prompt:
username = input('NASA Earthdata Login username: ')
if password is None and prompt:
password = getpass('NASA Earthdata Login password: ')
self.session = get_authenticated_session(username, password)
self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})
check_quota(self)
¶
Returns:
Type | Description |
---|---|
Optional[int] |
The number of jobs left in your quota, or None if you have no quota |
Source code in hyp3_sdk/hyp3.py
def check_quota(self) -> Optional[int]:
"""
Returns:
The number of jobs left in your quota, or None if you have no quota
"""
info = self.my_info()
return info['quota']['remaining']
find_jobs(self, start=None, end=None, status_code=None, name=None, job_type=None)
¶
Gets a Batch of jobs from HyP3 matching the provided search criteria
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
Optional[datetime.datetime] |
only jobs submitted after given time |
None |
end |
Optional[datetime.datetime] |
only jobs submitted before given time |
None |
status_code |
Optional[str] |
only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING) |
None |
name |
Optional[str] |
only jobs with this name |
None |
job_type |
Optional[str] |
only jobs with this job_type |
None |
Returns:
Type | Description |
---|---|
Batch |
A Batch object containing the found jobs |
Source code in hyp3_sdk/hyp3.py
def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
status_code: Optional[str] = None, name: Optional[str] = None,
job_type: Optional[str] = None) -> Batch:
"""Gets a Batch of jobs from HyP3 matching the provided search criteria
Args:
start: only jobs submitted after given time
end: only jobs submitted before given time
status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
name: only jobs with this name
job_type: only jobs with this job_type
Returns:
A Batch object containing the found jobs
"""
params = {}
for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
param_value = locals().get(param_name)
if param_value is not None:
if isinstance(param_value, datetime):
if param_value.tzinfo is None:
param_value = param_value.replace(tzinfo=timezone.utc)
param_value = param_value.isoformat(timespec='seconds')
params[param_name] = param_value
response = self.session.get(urljoin(self.url, '/jobs'), params=params)
_raise_for_hyp3_status(response)
jobs = [Job.from_dict(job) for job in response.json()['jobs']]
while 'next' in response.json():
next_url = response.json()['next']
response = self.session.get(next_url)
_raise_for_hyp3_status(response)
jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])
return Batch(jobs)
get_job_by_id(self, job_id)
¶
Get job by job ID
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id |
str |
A job ID |
required |
Returns:
Type | Description |
---|---|
Job |
A Job object |
Source code in hyp3_sdk/hyp3.py
def get_job_by_id(self, job_id: str) -> Job:
"""Get job by job ID
Args:
job_id: A job ID
Returns:
A Job object
"""
response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
_raise_for_hyp3_status(response)
return Job.from_dict(response.json())
my_info(self)
¶
Returns:
Type | Description |
---|---|
dict |
Your user information |
Source code in hyp3_sdk/hyp3.py
def my_info(self) -> dict:
"""
Returns:
Your user information
"""
response = self.session.get(urljoin(self.url, '/user'))
_raise_for_hyp3_status(response)
return response.json()
prepare_autorift_job(granule1, granule2, name=None)
classmethod
¶
Submit an autoRIFT job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granule1 |
str |
The first granule (scene) to use |
required |
granule2 |
str |
The second granule (scene) to use |
required |
name |
Optional[str] |
A name for the job |
None |
Returns:
Type | Description |
---|---|
dict |
A dictionary containing the prepared autoRIFT job |
Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
"""Submit an autoRIFT job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
Returns:
A dictionary containing the prepared autoRIFT job
"""
job_dict = {
'job_parameters': {'granules': [granule1, granule2]},
'job_type': 'AUTORIFT',
}
if name is not None:
job_dict['name'] = name
return job_dict
prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)
classmethod
¶
Submit an InSAR job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granule1 |
str |
The first granule (scene) to use |
required |
granule2 |
str |
The second granule (scene) to use |
required |
name |
Optional[str] |
A name for the job |
None |
include_look_vectors |
bool |
Include the look vector theta and phi files in the product package |
False |
include_los_displacement |
bool |
Include a GeoTIFF in the product package containing displacement values
along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
|
False |
include_inc_map |
bool |
Include the local and ellipsoidal incidence angle maps in the product package |
False |
looks |
Literal['20x4', '10x2'] |
Number of looks to take in range and azimuth |
'20x4' |
include_dem |
bool |
Include the digital elevation model GeoTIFF in the product package |
False |
include_wrapped_phase |
bool |
Include the wrapped phase GeoTIFF in the product package |
False |
apply_water_mask |
bool |
Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping |
False |
include_displacement_maps |
bool |
Include displacement maps (line-of-sight and vertical) in the product package |
False |
Returns:
Type | Description |
---|---|
dict |
A dictionary containing the prepared InSAR job |
Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_insar_job(cls,
granule1: str,
granule2: str,
name: Optional[str] = None,
include_look_vectors: bool = False,
include_los_displacement: bool = False,
include_inc_map: bool = False,
looks: Literal['20x4', '10x2'] = '20x4',
include_dem: bool = False,
include_wrapped_phase: bool = False,
apply_water_mask: bool = False,
include_displacement_maps: bool = False) -> dict:
"""Submit an InSAR job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
include_look_vectors: Include the look vector theta and phi files in the product package
include_los_displacement: Include a GeoTIFF in the product package containing displacement values
along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
`include_displacement_maps`, and will be removed in a future release.
include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
looks: Number of looks to take in range and azimuth
include_dem: Include the digital elevation model GeoTIFF in the product package
include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
as invalid for phase unwrapping
include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
Returns:
A dictionary containing the prepared InSAR job
"""
if include_los_displacement:
warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
'include_displacement_maps, and will be removed in a future release.', FutureWarning)
job_parameters = locals().copy()
for key in ['cls', 'granule1', 'granule2', 'name']:
job_parameters.pop(key)
job_dict = {
'job_parameters': {'granules': [granule1, granule2], **job_parameters},
'job_type': 'INSAR_GAMMA',
}
if name is not None:
job_dict['name'] = name
return job_dict
prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')
classmethod
¶
Submit an RTC job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granule |
str |
The granule (scene) to use |
required |
name |
Optional[str] |
A name for the job |
None |
dem_matching |
bool |
Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files |
False |
include_dem |
bool |
Include the DEM file in the product package |
False |
include_inc_map |
bool |
Include the local incidence angle map in the product package |
False |
include_rgb |
bool |
Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules) |
False |
include_scattering_area |
bool |
Include the scattering area in the product package |
False |
radiometry |
Literal['sigma0', 'gamma0'] |
Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0) |
'gamma0' |
resolution |
Literal[30] |
Desired output pixel spacing in meters |
30 |
scale |
Literal['amplitude', 'power'] |
Scale of output image; either power or amplitude |
'power' |
speckle_filter |
bool |
Apply an Enhanced Lee speckle filter |
False |
dem_name |
Literal['copernicus', 'legacy'] |
Name of the DEM to use for processing. |
'copernicus' |
Returns:
Type | Description |
---|---|
dict |
A dictionary containing the prepared RTC job |
Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_rtc_job(cls,
granule: str,
name: Optional[str] = None,
dem_matching: bool = False,
include_dem: bool = False,
include_inc_map: bool = False,
include_rgb: bool = False,
include_scattering_area: bool = False,
radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
resolution: Literal[30] = 30,
scale: Literal['amplitude', 'power'] = 'power',
speckle_filter: bool = False,
dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
"""Submit an RTC job
Args:
granule: The granule (scene) to use
name: A name for the job
dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
include_dem: Include the DEM file in the product package
include_inc_map: Include the local incidence angle map in the product package
include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
(ignored for single-pol granules)
include_scattering_area: Include the scattering area in the product package
radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
projected into the look direction (gamma0)
resolution: Desired output pixel spacing in meters
scale: Scale of output image; either power or amplitude
speckle_filter: Apply an Enhanced Lee speckle filter
dem_name: Name of the DEM to use for processing. `copernicus` will use the Copernicus GLO-30 Public DEM,
while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.
Returns:
A dictionary containing the prepared RTC job
"""
job_parameters = locals().copy()
for key in ['granule', 'name', 'cls']:
job_parameters.pop(key, None)
job_dict = {
'job_parameters': {'granules': [granule], **job_parameters},
'job_type': 'RTC_GAMMA',
}
if name is not None:
job_dict['name'] = name
return job_dict
refresh(self, job_or_batch)
¶
Refresh each jobs' information
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_or_batch |
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job] |
A Batch of Job object to refresh |
required |
Returns:
Type | Description |
---|---|
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job] |
A Batch or Job object with refreshed information |
Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
"""Refresh each jobs' information
Args:
job_or_batch: A Batch of Job object to refresh
Returns:
A Batch or Job object with refreshed information
"""
raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')
submit_autorift_job(self, granule1, granule2, name=None)
¶
Submit an autoRIFT job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granule1 |
str |
The first granule (scene) to use |
required |
granule2 |
str |
The second granule (scene) to use |
required |
name |
Optional[str] |
A name for the job |
None |
Returns:
Type | Description |
---|---|
Batch |
A Batch object containing the autoRIFT job |
Source code in hyp3_sdk/hyp3.py
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
"""Submit an autoRIFT job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
Returns:
A Batch object containing the autoRIFT job
"""
job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_insar_job(self, granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)
¶
Submit an InSAR job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granule1 |
str |
The first granule (scene) to use |
required |
granule2 |
str |
The second granule (scene) to use |
required |
name |
Optional[str] |
A name for the job |
None |
include_look_vectors |
bool |
Include the look vector theta and phi files in the product package |
False |
include_los_displacement |
bool |
Include a GeoTIFF in the product package containing displacement values
along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
|
False |
include_inc_map |
bool |
Include the local and ellipsoidal incidence angle maps in the product package |
False |
looks |
Literal['20x4', '10x2'] |
Number of looks to take in range and azimuth |
'20x4' |
include_dem |
bool |
Include the digital elevation model GeoTIFF in the product package |
False |
include_wrapped_phase |
bool |
Include the wrapped phase GeoTIFF in the product package |
False |
apply_water_mask |
bool |
Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping |
False |
include_displacement_maps |
bool |
Include displacement maps (line-of-sight and vertical) in the product package |
False |
Returns:
Type | Description |
---|---|
Batch |
A Batch object containing the InSAR job |
Source code in hyp3_sdk/hyp3.py
def submit_insar_job(self,
granule1: str,
granule2: str,
name: Optional[str] = None,
include_look_vectors: bool = False,
include_los_displacement: bool = False,
include_inc_map: bool = False,
looks: Literal['20x4', '10x2'] = '20x4',
include_dem: bool = False,
include_wrapped_phase: bool = False,
apply_water_mask: bool = False,
include_displacement_maps: bool = False) -> Batch:
"""Submit an InSAR job
Args:
granule1: The first granule (scene) to use
granule2: The second granule (scene) to use
name: A name for the job
include_look_vectors: Include the look vector theta and phi files in the product package
include_los_displacement: Include a GeoTIFF in the product package containing displacement values
along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
`include_displacement_maps`, and will be removed in a future release.
include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
looks: Number of looks to take in range and azimuth
include_dem: Include the digital elevation model GeoTIFF in the product package
include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
as invalid for phase unwrapping
include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
Returns:
A Batch object containing the InSAR job
"""
arguments = locals().copy()
arguments.pop('self')
job_dict = self.prepare_insar_job(**arguments)
return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_prepared_jobs(self, prepared_jobs)
¶
Submit a prepared job dictionary, or list of prepared job dictionaries
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prepared_jobs |
Union[dict, List[dict]] |
A prepared job dictionary, or list of prepared job dictionaries |
required |
Returns:
Type | Description |
---|---|
Batch |
A Batch object containing the submitted job(s) |
Source code in hyp3_sdk/hyp3.py
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
"""Submit a prepared job dictionary, or list of prepared job dictionaries
Args:
prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries
Returns:
A Batch object containing the submitted job(s)
"""
if isinstance(prepared_jobs, dict):
payload = {'jobs': [prepared_jobs]}
else:
payload = {'jobs': prepared_jobs}
response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
_raise_for_hyp3_status(response)
batch = Batch()
for job in response.json()['jobs']:
batch += Job.from_dict(job)
return batch
submit_rtc_job(self, granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')
¶
Submit an RTC job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
granule |
str |
The granule (scene) to use |
required |
name |
Optional[str] |
A name for the job |
None |
dem_matching |
bool |
Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files |
False |
include_dem |
bool |
Include the DEM file in the product package |
False |
include_inc_map |
bool |
Include the local incidence angle map in the product package |
False |
include_rgb |
bool |
Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules) |
False |
include_scattering_area |
bool |
Include the scattering area in the product package |
False |
radiometry |
Literal['sigma0', 'gamma0'] |
Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0) |
'gamma0' |
resolution |
Literal[30] |
Desired output pixel spacing in meters |
30 |
scale |
Literal['amplitude', 'power'] |
Scale of output image; either power or amplitude |
'power' |
speckle_filter |
bool |
Apply an Enhanced Lee speckle filter |
False |
dem_name |
Literal['copernicus', 'legacy'] |
Name of the DEM to use for processing. |
'copernicus' |
Returns:
Type | Description |
---|---|
Batch |
A Batch object containing the RTC job |
Source code in hyp3_sdk/hyp3.py
def submit_rtc_job(self,
granule: str,
name: Optional[str] = None,
dem_matching: bool = False,
include_dem: bool = False,
include_inc_map: bool = False,
include_rgb: bool = False,
include_scattering_area: bool = False,
radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
resolution: Literal[30] = 30,
scale: Literal['amplitude', 'power'] = 'power',
speckle_filter: bool = False,
dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
"""Submit an RTC job
Args:
granule: The granule (scene) to use
name: A name for the job
dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
include_dem: Include the DEM file in the product package
include_inc_map: Include the local incidence angle map in the product package
include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
(ignored for single-pol granules)
include_scattering_area: Include the scattering area in the product package
radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
projected into the look direction (gamma0)
resolution: Desired output pixel spacing in meters
scale: Scale of output image; either power or amplitude
speckle_filter: Apply an Enhanced Lee speckle filter
dem_name: Name of the DEM to use for processing. `copernicus` will use the Copernicus GLO-30 Public DEM,
while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.
Returns:
A Batch object containing the RTC job
"""
arguments = locals()
arguments.pop('self')
job_dict = self.prepare_rtc_job(**arguments)
return self.submit_prepared_jobs(prepared_jobs=job_dict)
watch(self, job_or_batch, timeout=10800, interval=60)
¶
Watch jobs until they complete
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_or_batch |
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job] |
A Batch or Job object of jobs to watch |
required |
timeout |
int |
How long to wait until exiting in seconds |
10800 |
interval |
Union[int, float] |
How often to check for updates in seconds |
60 |
Returns:
Type | Description |
---|---|
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job] |
A Batch or Job object with refreshed watched jobs |
Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
interval: Union[int, float] = 60) -> Union[Batch, Job]:
"""Watch jobs until they complete
Args:
job_or_batch: A Batch or Job object of jobs to watch
timeout: How long to wait until exiting in seconds
interval: How often to check for updates in seconds
Returns:
A Batch or Job object with refreshed watched jobs
"""
raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')
jobs
¶
Batch
¶
Source code in hyp3_sdk/jobs.py
class Batch:
def __init__(self, jobs: Optional[List[Job]] = None):
if jobs is None:
jobs = []
self.jobs = jobs
def __add__(self, other: Union[Job, 'Batch']):
if isinstance(other, Batch):
return Batch(self.jobs + other.jobs)
elif isinstance(other, Job):
return Batch(self.jobs + [other])
else:
raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")
def __iadd__(self, other: Union[Job, 'Batch']):
if isinstance(other, Batch):
self.jobs += other.jobs
elif isinstance(other, Job):
self.jobs += [other]
else:
raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
return self
def __iter__(self):
return iter(self.jobs)
def __len__(self):
return len(self.jobs)
def __contains__(self, job: Job):
return job in self.jobs
def __eq__(self, other: 'Batch'):
return self.jobs == other.jobs
def __delitem__(self, job: int):
self.jobs.pop(job)
return self
def __getitem__(self, index: int):
if isinstance(index, slice):
return Batch(self.jobs[index])
return self.jobs[index]
def __setitem__(self, index: int, job: Job):
self.jobs[index] = job
return self
def __repr__(self):
reprs = ", ".join([job.__repr__() for job in self.jobs])
return f'Batch([{reprs}])'
def __str__(self):
count = self._count_statuses()
return f'{len(self)} HyP3 Jobs: ' \
f'{count["SUCCEEDED"]} succeeded, ' \
f'{count["FAILED"]} failed, ' \
f'{count["RUNNING"]} running, ' \
f'{count["PENDING"]} pending.'
def _count_statuses(self):
return Counter([job.status_code for job in self.jobs])
def complete(self) -> bool:
"""
Returns: True if all jobs are complete, otherwise returns False
"""
for job in self.jobs:
if not job.complete():
return False
return True
def succeeded(self) -> bool:
"""
Returns: True if all jobs have succeeded, otherwise returns False
"""
for job in self.jobs:
if not job.succeeded():
return False
return True
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
"""
Args:
location: Directory location to put files into
create: Create `location` if it does not point to an existing directory
Returns: list of Path objects to downloaded files
"""
downloaded_files = []
tqdm = get_tqdm_progress_bar()
for job in tqdm(self.jobs):
try:
downloaded_files.extend(job.download_files(location, create))
except HyP3SDKError as e:
print(f'Warning: {e}. Skipping download for {job}.')
return downloaded_files
def any_expired(self) -> bool:
"""Check succeeded jobs for expiration"""
for job in self.jobs:
try:
if job.expired():
return True
except HyP3SDKError:
continue
return False
def filter_jobs(
self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
"""Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.
Args:
succeeded: Include all succeeded jobs
running: Include all running jobs
failed: Include all failed jobs
include_expired: Include expired jobs in the result
Returns:
batch: A batch object containing jobs matching all the selected statuses
"""
filtered_jobs = []
for job in self.jobs:
if job.succeeded() and succeeded:
if include_expired or not job.expired():
filtered_jobs.append(job)
elif job.running() and running:
filtered_jobs.append(job)
elif job.failed() and failed:
filtered_jobs.append(job)
return Batch(filtered_jobs)
any_expired(self)
¶
Check succeeded jobs for expiration
Source code in hyp3_sdk/jobs.py
def any_expired(self) -> bool:
"""Check succeeded jobs for expiration"""
for job in self.jobs:
try:
if job.expired():
return True
except HyP3SDKError:
continue
return False
complete(self)
¶
Returns: True if all jobs are complete, otherwise returns False
Source code in hyp3_sdk/jobs.py
def complete(self) -> bool:
"""
Returns: True if all jobs are complete, otherwise returns False
"""
for job in self.jobs:
if not job.complete():
return False
return True
download_files(self, location='.', create=True)
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
location |
Union[pathlib.Path, str] |
Directory location to put files into |
'.' |
create |
bool |
Create |
True |
Returns: list of Path objects to downloaded files
Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
"""
Args:
location: Directory location to put files into
create: Create `location` if it does not point to an existing directory
Returns: list of Path objects to downloaded files
"""
downloaded_files = []
tqdm = get_tqdm_progress_bar()
for job in tqdm(self.jobs):
try:
downloaded_files.extend(job.download_files(location, create))
except HyP3SDKError as e:
print(f'Warning: {e}. Skipping download for {job}.')
return downloaded_files
filter_jobs(self, succeeded=True, running=True, failed=False, include_expired=True)
¶
Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
succeeded |
bool |
Include all succeeded jobs |
True |
running |
bool |
Include all running jobs |
True |
failed |
bool |
Include all failed jobs |
False |
include_expired |
bool |
Include expired jobs in the result |
True |
Returns:
Type | Description |
---|---|
batch |
A batch object containing jobs matching all the selected statuses |
Source code in hyp3_sdk/jobs.py
def filter_jobs(
self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
"""Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.
Args:
succeeded: Include all succeeded jobs
running: Include all running jobs
failed: Include all failed jobs
include_expired: Include expired jobs in the result
Returns:
batch: A batch object containing jobs matching all the selected statuses
"""
filtered_jobs = []
for job in self.jobs:
if job.succeeded() and succeeded:
if include_expired or not job.expired():
filtered_jobs.append(job)
elif job.running() and running:
filtered_jobs.append(job)
elif job.failed() and failed:
filtered_jobs.append(job)
return Batch(filtered_jobs)
succeeded(self)
¶
Returns: True if all jobs have succeeded, otherwise returns False
Source code in hyp3_sdk/jobs.py
def succeeded(self) -> bool:
"""
Returns: True if all jobs have succeeded, otherwise returns False
"""
for job in self.jobs:
if not job.succeeded():
return False
return True
Job
¶
Source code in hyp3_sdk/jobs.py
class Job:
_attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}
def __init__(
self,
job_type: str,
job_id: str,
request_time: datetime,
status_code: str,
user_id: str,
name: Optional[str] = None,
job_parameters: Optional[dict] = None,
files: Optional[List] = None,
logs: Optional[List] = None,
browse_images: Optional[List] = None,
thumbnail_images: Optional[List] = None,
expiration_time: Optional[datetime] = None,
processing_time_in_seconds: Optional[int] = None,
):
self.job_id = job_id
self.job_type = job_type
self.request_time = request_time
self.status_code = status_code
self.user_id = user_id
self.name = name
self.job_parameters = job_parameters
self.files = files
self.logs = logs
self.browse_images = browse_images
self.thumbnail_images = thumbnail_images
self.expiration_time = expiration_time
self.processing_time_in_seconds = processing_time_in_seconds
def __repr__(self):
return f'Job.from_dict({self.to_dict()})'
def __str__(self):
return f'HyP3 {self.job_type} job {self.job_id}'
def __eq__(self, other):
return self.__dict__ == other.__dict__
@staticmethod
def from_dict(input_dict: dict):
expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
return Job(
job_type=input_dict['job_type'],
job_id=input_dict['job_id'],
request_time=parse_date(input_dict['request_time']),
status_code=input_dict['status_code'],
user_id=input_dict['user_id'],
name=input_dict.get('name'),
job_parameters=input_dict.get('job_parameters'),
files=input_dict.get('files'),
logs=input_dict.get('logs'),
browse_images=input_dict.get('browse_images'),
thumbnail_images=input_dict.get('thumbnail_images'),
expiration_time=expiration_time,
processing_time_in_seconds=input_dict.get('processing_time_in_seconds'),
)
def to_dict(self, for_resubmit: bool = False):
job_dict = {}
if for_resubmit:
keys_to_process = Job._attributes_for_resubmit
else:
keys_to_process = vars(self).keys()
for key in keys_to_process:
value = self.__getattribute__(key)
if value is not None:
if isinstance(value, datetime):
job_dict[key] = value.isoformat(timespec='seconds')
else:
job_dict[key] = value
return job_dict
def succeeded(self) -> bool:
return self.status_code == 'SUCCEEDED'
def failed(self) -> bool:
return self.status_code == 'FAILED'
def complete(self) -> bool:
return self.succeeded() or self.failed()
# TODO may want to update this to check if status code is actually RUNNING, because currently this also returns
# true if status is PENDING
def running(self) -> bool:
return not self.complete()
def expired(self) -> bool:
return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
"""
Args:
location: Directory location to put files into
create: Create `location` if it does not point to an existing directory
Returns: list of Path objects to downloaded files
"""
location = Path(location)
if not self.succeeded():
raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
if self.expired():
raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')
if create:
location.mkdir(parents=True, exist_ok=True)
elif not location.is_dir():
raise NotADirectoryError(str(location))
downloaded_files = []
for file in self.files:
download_url = file['url']
filename = location / file['filename']
try:
downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
except HTTPError:
raise HyP3SDKError(f'Unable to download file: {download_url}')
return downloaded_files
download_files(self, location='.', create=True)
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
location |
Union[pathlib.Path, str] |
Directory location to put files into |
'.' |
create |
bool |
Create |
True |
Returns: list of Path objects to downloaded files
Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
"""
Args:
location: Directory location to put files into
create: Create `location` if it does not point to an existing directory
Returns: list of Path objects to downloaded files
"""
location = Path(location)
if not self.succeeded():
raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
if self.expired():
raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')
if create:
location.mkdir(parents=True, exist_ok=True)
elif not location.is_dir():
raise NotADirectoryError(str(location))
downloaded_files = []
for file in self.files:
download_url = file['url']
filename = location / file['filename']
try:
downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
except HTTPError:
raise HyP3SDKError(f'Unable to download file: {download_url}')
return downloaded_files
util
¶
Extra utilities for working with HyP3
chunk(itr, n=200)
¶
Split a sequence into small chunks
Parameters:
Name | Type | Description | Default |
---|---|---|---|
itr |
Sequence[Any] |
A sequence object to chunk |
required |
n |
int |
Size of the chunks to return |
200 |
Source code in hyp3_sdk/util.py
def chunk(itr: Sequence[Any], n: int = 200) -> Generator[Sequence[Any], None, None]:
"""Split a sequence into small chunks
Args:
itr: A sequence object to chunk
n: Size of the chunks to return
"""
if not isinstance(n, int) or n < 1:
raise ValueError(f'n must be a positive integer: {n}')
for i in range(0, len(itr), n):
yield itr[i:i + n]
download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)
¶
Download a file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
URL of the file to download |
required |
filepath |
Union[pathlib.Path, str] |
Location to place file into |
required |
chunk_size |
Size to chunk the download into |
None |
|
retries |
Number of retries to attempt |
2 |
|
backoff_factor |
Factor for calculating time between retries |
1 |
Returns:
Type | Description |
---|---|
download_path |
The path to the downloaded file |
Source code in hyp3_sdk/util.py
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
"""Download a file
Args:
url: URL of the file to download
filepath: Location to place file into
chunk_size: Size to chunk the download into
retries: Number of retries to attempt
backoff_factor: Factor for calculating time between retries
Returns:
download_path: The path to the downloaded file
"""
filepath = Path(filepath)
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
)
session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
session.mount('http://', HTTPAdapter(max_retries=retry_strategy))
stream = False if chunk_size is None else True
with session.get(url, stream=stream) as s:
s.raise_for_status()
tqdm = get_tqdm_progress_bar()
with tqdm.wrapattr(open(filepath, "wb"), 'write', miniters=1, desc=filepath.name,
total=int(s.headers.get('content-length', 0))) as f:
for chunk in s.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
session.close()
return filepath
extract_zipped_product(zip_file, delete=True)
¶
Extract a zipped HyP3 product
Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
deleting zip file
afterward.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zip_file |
Union[str, pathlib.Path] |
Zipped HyP3 product to extract |
required |
delete |
bool |
Delete |
True |
Returns:
Type | Description |
---|---|
Path |
Path to the HyP3 product folder containing the product files |
Source code in hyp3_sdk/util.py
def extract_zipped_product(zip_file: Union[str, Path], delete: bool = True) -> Path:
"""Extract a zipped HyP3 product
Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
deleting `zip file` afterward.
Args:
zip_file: Zipped HyP3 product to extract
delete: Delete `zip_file` after it has been extracted
Returns:
Path to the HyP3 product folder containing the product files
"""
zip_file = Path(zip_file)
with ZipFile(zip_file) as z:
z.extractall(path=zip_file.parent)
if delete:
zip_file.unlink()
return zip_file.parent / zip_file.stem
get_authenticated_session(username, password)
¶
Log into HyP3 using credentials for urs.earthdata.nasa.gov
from either the provided
credentials or a .netrc
file.
Returns:
Type | Description |
---|---|
Session |
An authenticated HyP3 Session |
Source code in hyp3_sdk/util.py
def get_authenticated_session(username: str, password: str) -> requests.Session:
"""Log into HyP3 using credentials for `urs.earthdata.nasa.gov` from either the provided
credentials or a `.netrc` file.
Returns:
An authenticated HyP3 Session
"""
s = requests.Session()
if hyp3_sdk.TESTING:
return s
if username is not None and password is not None:
response = s.get(AUTH_URL, auth=(username, password))
try:
response.raise_for_status()
except requests.HTTPError:
raise AuthenticationError('Was not able to authenticate with credentials provided\n'
'This could be due to invalid credentials or a connection error.')
else:
response = s.get(AUTH_URL)
try:
response.raise_for_status()
except requests.HTTPError:
raise AuthenticationError('Was not able to authenticate with .netrc file and no credentials provided\n'
'This could be due to invalid credentials in .netrc or a connection error.')
return s