Skip to content

hyp3_sdk v1.4.0 API Reference

A python wrapper around the HyP3 API

get_metadata(granules)

Get the metadata for a granule or list of granules

Parameters:

Name Type Description Default
granules Union[str, Iterable[str]]

granule(s) to lookup metadata for

required

Returns:

Type Description
metadata

metadata for the granule(s)

Source code in hyp3_sdk/asf_search.py
def get_metadata(granules: Union[str, Iterable[str]]) -> Union[dict, List[dict]]:
    """Get the metadata for a granule or list of granules

    Args:
        granules: granule(s) to lookup metadata for

    Returns:
        metadata: metadata for the granule(s)
    """
    if isinstance(granules, str):
        granule_list = granules
    else:
        granule_list = ','.join(granules)

    params = {
        'output': 'jsonlite',
        'granule_list': granule_list,
    }
    response = requests.post(_SEARCH_API, params=params)
    _raise_for_search_status(response)

    metadata = [result for result in response.json()['results']
                if not result['productType'].startswith('METADATA_')]

    if isinstance(granules, str):
        return metadata[0]

    return metadata

get_nearest_neighbors(granule, max_neighbors=2)

Get a Sentinel-1 granule's nearest neighbors from a temporal stack (backwards in time)

Parameters:

Name Type Description Default
granule str

reference granule

required
max_neighbors int

maximum number of neighbors to return

2

Returns:

Type Description
neighbors

a list of neighbors sorted by time

Source code in hyp3_sdk/asf_search.py
def get_nearest_neighbors(granule: str, max_neighbors: int = 2,) -> List[dict]:
    """Get a Sentinel-1 granule's nearest neighbors from a temporal stack (backwards in time)

    Args:
        granule: reference granule
        max_neighbors: maximum number of neighbors to return

    Returns:
        neighbors: a list of neighbors sorted by time
    """
    params = {
        'output': 'json',  # jsonlite doesn't include centerLat/centerLon
        'platform': 'S1',
        'granule_list': granule,
    }
    response = requests.post(_SEARCH_API, params=params)
    _raise_for_search_status(response)
    references = [r for r in response.json()[0] if not r['processingLevel'].startswith('METADATA_')]
    if not references:
        raise ASFSearchError(f'Reference Sentinel-1 granule {granule} could not be found')
    reference = references[0]

    params = {
        'output': 'jsonlite',
        'platform': 'S1',
        'intersectsWith': f'POINT ({reference["centerLon"]} {reference["centerLat"]})',
        'end': reference['startTime'],  # includes reference scene
        'beamMode': reference['beamMode'],
        'flightDirection': reference['flightDirection'],
        'processingLevel': reference['processingLevel'],
        'relativeOrbit': reference['relativeOrbit'],
        'polarization': _get_matching_polarizations(reference['polarization']),
        'lookDirection': reference['lookDirection'],
    }
    response = requests.post(_SEARCH_API, params=params)
    _raise_for_search_status(response)
    neighbors = sorted(response.json()['results'], key=lambda x: x['startTime'], reverse=True)
    return neighbors[1:max_neighbors+1]

exceptions

Errors and exceptions to raise when the SDK runs into problems

ASFSearchError (HyP3SDKError)

Raise for errors when using the ASF Search module

AuthenticationError (HyP3SDKError)

Raise when authentication does not succeed

HyP3Error (HyP3SDKError)

Raise for errors when using the HyP3 module

HyP3SDKError (Exception)

Base Exception for the HyP3 SDK

ServerError (HyP3SDKError)

Raise when the HyP3 SDK encounters a server error

hyp3

HyP3

A python wrapper around the HyP3 API

__init__(self, api_url='https://hyp3-api.asf.alaska.edu', username=None, password=None, prompt=False) special

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

'https://hyp3-api.asf.alaska.edu'
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in hyp3_sdk/hyp3.py
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """
    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

check_quota(self)

Returns:

Type Description
int

The number of jobs left in your quota

Source code in hyp3_sdk/hyp3.py
def check_quota(self) -> int:
    """
    Returns:
        The number of jobs left in your quota
    """
    info = self.my_info()
    return info['quota']['remaining']

find_jobs(self, start=None, end=None, status_code=None, name=None, job_type=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime.datetime]

only jobs submitted after given time

None
end Optional[datetime.datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
              status_code: Optional[str] = None, name: Optional[str] = None,
              job_type: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)

get_job_by_id(self, job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in hyp3_sdk/hyp3.py
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())

my_info(self)

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()

prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
dict

A dictionary containing the prepared InSAR job

Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'power']

Scale of output image; either power or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[30] = 30,
                    scale: Literal['amplitude', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; either power or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict

refresh(self, job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

submit_autorift_job(self, granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_insar_job(self, granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_prepared_jobs(self, prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in hyp3_sdk/hyp3.py
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch

submit_rtc_job(self, granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'power']

Scale of output image; either power or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[30] = 30,
                   scale: Literal['amplitude', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; either power or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

watch(self, job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

jobs

Batch

any_expired(self)

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False

complete(self)

Returns: True if all jobs are complete, otherwise returns False

Source code in hyp3_sdk/jobs.py
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True

download_files(self, location='.', create=True)

Parameters:

Name Type Description Default
location Union[pathlib.Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e}. Skipping download for {job}.')
    return downloaded_files

filter_jobs(self, succeeded=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Type Description
batch

A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
def filter_jobs(
        self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)

succeeded(self)

Returns: True if all jobs have succeeded, otherwise returns False

Source code in hyp3_sdk/jobs.py
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

Job

download_files(self, location='.', create=True)

Parameters:

Name Type Description Default
location Union[pathlib.Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

util

Extra utilities for working with HyP3

chunk(itr, n=200)

Split a sequence into small chunks

Parameters:

Name Type Description Default
itr Sequence[Any]

A sequence object to chunk

required
n int

Size of the chunks to return

200
Source code in hyp3_sdk/util.py
def chunk(itr: Sequence[Any], n: int = 200) -> Generator[Sequence[Any], None, None]:
    """Split a sequence into small chunks

    Args:
        itr: A sequence object to chunk
        n: Size of the chunks to return
    """
    if not isinstance(n, int) or n < 1:
        raise ValueError(f'n must be a positive integer: {n}')

    for i in range(0, len(itr), n):
        yield itr[i:i + n]

download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)

Download a file

Parameters:

Name Type Description Default
url str

URL of the file to download

required
filepath Union[pathlib.Path, str]

Location to place file into

required
chunk_size

Size to chunk the download into

None
retries

Number of retries to attempt

2
backoff_factor

Factor for calculating time between retries

1

Returns:

Type Description
download_path

The path to the downloaded file

Source code in hyp3_sdk/util.py
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
    """Download a file
    Args:
        url: URL of the file to download
        filepath: Location to place file into
        chunk_size: Size to chunk the download into
        retries: Number of retries to attempt
        backoff_factor: Factor for calculating time between retries
    Returns:
        download_path: The path to the downloaded file
    """
    filepath = Path(filepath)
    session = requests.Session()
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[429, 500, 502, 503, 504],
    )

    session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
    session.mount('http://', HTTPAdapter(max_retries=retry_strategy))

    with session.get(url, stream=True) as s:
        s.raise_for_status()
        tqdm = get_tqdm_progress_bar()
        with tqdm.wrapattr(open(filepath, "wb"), 'write', miniters=1, desc=filepath.name,
                           total=int(s.headers.get('content-length', 0))) as f:
            for chunk in s.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    session.close()

    return filepath

extract_zipped_product(zip_file, delete=True)

Extract a zipped HyP3 product

Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally deleting zip file afterward.

Parameters:

Name Type Description Default
zip_file Union[str, pathlib.Path]

Zipped HyP3 product to extract

required
delete bool

Delete zip_file after it has been extracted

True

Returns:

Type Description
Path

Path to the HyP3 product folder containing the product files

Source code in hyp3_sdk/util.py
def extract_zipped_product(zip_file: Union[str, Path], delete: bool = True) -> Path:
    """Extract a zipped HyP3 product

    Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
    deleting `zip file` afterward.

    Args:
        zip_file: Zipped HyP3 product to extract
        delete: Delete `zip_file` after it has been extracted

    Returns:
        Path to the HyP3 product folder containing the product files
    """
    zip_file = Path(zip_file)
    with ZipFile(zip_file) as z:
        z.extractall(path=zip_file.parent)

    if delete:
        zip_file.unlink()

    return zip_file.parent / zip_file.stem

get_authenticated_session(username, password)

Log into HyP3 using credentials for urs.earthdata.nasa.gov from either the provided credentials or a .netrc file.

Returns:

Type Description
Session

An authenticated HyP3 Session

Source code in hyp3_sdk/util.py
def get_authenticated_session(username: str, password: str) -> requests.Session:
    """Log into HyP3 using credentials for `urs.earthdata.nasa.gov` from either the provided
     credentials or a `.netrc` file.

    Returns:
        An authenticated HyP3 Session
    """
    s = requests.Session()
    if hyp3_sdk.TESTING:
        return s
    if username is not None and password is not None:
        response = s.get(AUTH_URL, auth=(username, password))
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with credentials provided\n'
                                      'This could be due to invalid credentials or a connection error.')
    else:
        response = s.get(AUTH_URL)
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with .netrc file and no credentials provided\n'
                                      'This could be due to invalid credentials in .netrc or a connection error.')
    return s