Skip to content

hyp3_sdk v1.6.0 API Reference

A python wrapper around the HyP3 API

exceptions

Errors and exceptions to raise when the SDK runs into problems

ASFSearchError (HyP3SDKError)

Raise for errors when using the ASF Search module

Source code in hyp3_sdk/exceptions.py
class ASFSearchError(HyP3SDKError):
    """Raise for errors when using the ASF Search module"""

AuthenticationError (HyP3SDKError)

Raise when authentication does not succeed

Source code in hyp3_sdk/exceptions.py
class AuthenticationError(HyP3SDKError):
    """Raise when authentication does not succeed"""

HyP3Error (HyP3SDKError)

Raise for errors when using the HyP3 module

Source code in hyp3_sdk/exceptions.py
class HyP3Error(HyP3SDKError):
    """Raise for errors when using the HyP3 module"""

HyP3SDKError (Exception)

Base Exception for the HyP3 SDK

Source code in hyp3_sdk/exceptions.py
class HyP3SDKError(Exception):
    """Base Exception for the HyP3 SDK"""

ServerError (HyP3SDKError)

Raise when the HyP3 SDK encounters a server error

Source code in hyp3_sdk/exceptions.py
class ServerError(HyP3SDKError):
    """Raise when the HyP3 SDK encounters a server error"""

hyp3

HyP3

A python wrapper around the HyP3 API

Source code in hyp3_sdk/hyp3.py
class HyP3:
    """A python wrapper around the HyP3 API"""

    def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
                 prompt: bool = False):
        """
        Args:
            api_url: Address of the HyP3 API
            username: Username for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            password: Password for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            prompt: Prompt for username and/or password interactively when they
                are not provided as keyword parameters
        """
        self.url = api_url

        if username is None and prompt:
            username = input('NASA Earthdata Login username: ')
        if password is None and prompt:
            password = getpass('NASA Earthdata Login password: ')

        self.session = get_authenticated_session(username, password)
        self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

    def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
                  status_code: Optional[str] = None, name: Optional[str] = None,
                  job_type: Optional[str] = None) -> Batch:
        """Gets a Batch of jobs from HyP3 matching the provided search criteria

        Args:
            start: only jobs submitted after given time
            end: only jobs submitted before given time
            status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
            name: only jobs with this name
            job_type: only jobs with this job_type

        Returns:
            A Batch object containing the found jobs
        """
        params = {}
        for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
            param_value = locals().get(param_name)
            if param_value is not None:
                if isinstance(param_value, datetime):
                    if param_value.tzinfo is None:
                        param_value = param_value.replace(tzinfo=timezone.utc)
                    param_value = param_value.isoformat(timespec='seconds')

                params[param_name] = param_value

        response = self.session.get(urljoin(self.url, '/jobs'), params=params)
        _raise_for_hyp3_status(response)
        jobs = [Job.from_dict(job) for job in response.json()['jobs']]

        while 'next' in response.json():
            next_url = response.json()['next']
            response = self.session.get(next_url)
            _raise_for_hyp3_status(response)
            jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

        return Batch(jobs)

    def get_job_by_id(self, job_id: str) -> Job:
        """Get job by job ID

        Args:
            job_id: A job ID

        Returns:
            A Job object
        """
        response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
        _raise_for_hyp3_status(response)

        return Job.from_dict(response.json())

    @singledispatchmethod
    def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
              interval: Union[int, float] = 60) -> Union[Batch, Job]:
        """Watch jobs until they complete

        Args:
            job_or_batch: A Batch or Job object of jobs to watch
            timeout: How long to wait until exiting in seconds
            interval: How often to check for updates in seconds

        Returns:
            A Batch or Job object with refreshed watched jobs
        """
        raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

    @watch.register
    def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
        tqdm = get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                batch = self.refresh(batch)

                counts = batch._count_statuses()
                complete = counts['SUCCEEDED'] + counts['FAILED']

                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                # to control n/total manually; update is n += value
                progress_bar.n = complete
                progress_bar.update(0)

                if batch.complete():
                    return batch
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {batch}')

    @watch.register
    def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
        tqdm = get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                job = self.refresh(job)
                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                progress_bar.update(int(job.complete()))

                if job.complete():
                    return job
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {job}')

    @singledispatchmethod
    def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
        """Refresh each jobs' information

        Args:
            job_or_batch: A Batch of Job object to refresh

        Returns:
            A Batch or Job object with refreshed information
        """
        raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

    @refresh.register
    def _refresh_batch(self, batch: Batch):
        jobs = []
        for job in batch.jobs:
            jobs.append(self.refresh(job))
        return Batch(jobs)

    @refresh.register
    def _refresh_job(self, job: Job):
        return self.get_job_by_id(job.job_id)

    def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
        """Submit a prepared job dictionary, or list of prepared job dictionaries

        Args:
            prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

        Returns:
            A Batch object containing the submitted job(s)
        """
        if isinstance(prepared_jobs, dict):
            payload = {'jobs': [prepared_jobs]}
        else:
            payload = {'jobs': prepared_jobs}

        response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
        _raise_for_hyp3_status(response)

        batch = Batch()
        for job in response.json()['jobs']:
            batch += Job.from_dict(job)
        return batch

    def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A Batch object containing the autoRIFT job
        """
        job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A dictionary containing the prepared autoRIFT job
        """
        job_dict = {
            'job_parameters': {'granules': [granule1, granule2]},
            'job_type': 'AUTORIFT',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_rtc_job(self,
                       granule: str,
                       name: Optional[str] = None,
                       dem_matching: bool = False,
                       include_dem: bool = False,
                       include_inc_map: bool = False,
                       include_rgb: bool = False,
                       include_scattering_area: bool = False,
                       radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                       resolution: Literal[30] = 30,
                       scale: Literal['amplitude', 'power'] = 'power',
                       speckle_filter: bool = False,
                       dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; either power or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A Batch object containing the RTC job
        """
        arguments = locals()
        arguments.pop('self')
        job_dict = self.prepare_rtc_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_rtc_job(cls,
                        granule: str,
                        name: Optional[str] = None,
                        dem_matching: bool = False,
                        include_dem: bool = False,
                        include_inc_map: bool = False,
                        include_rgb: bool = False,
                        include_scattering_area: bool = False,
                        radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                        resolution: Literal[30] = 30,
                        scale: Literal['amplitude', 'power'] = 'power',
                        speckle_filter: bool = False,
                        dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; either power or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A dictionary containing the prepared RTC job
        """
        job_parameters = locals().copy()
        for key in ['granule', 'name', 'cls']:
            job_parameters.pop(key, None)

        job_dict = {
            'job_parameters': {'granules': [granule], **job_parameters},
            'job_type': 'RTC_GAMMA',
        }

        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_job(self,
                         granule1: str,
                         granule2: str,
                         name: Optional[str] = None,
                         include_look_vectors: bool = False,
                         include_los_displacement: bool = False,
                         include_inc_map: bool = False,
                         looks: Literal['20x4', '10x2'] = '20x4',
                         include_dem: bool = False,
                         include_wrapped_phase: bool = False,
                         apply_water_mask: bool = False,
                         include_displacement_maps: bool = False) -> Batch:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

        Returns:
            A Batch object containing the InSAR job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_job(cls,
                          granule1: str,
                          granule2: str,
                          name: Optional[str] = None,
                          include_look_vectors: bool = False,
                          include_los_displacement: bool = False,
                          include_inc_map: bool = False,
                          looks: Literal['20x4', '10x2'] = '20x4',
                          include_dem: bool = False,
                          include_wrapped_phase: bool = False,
                          apply_water_mask: bool = False,
                          include_displacement_maps: bool = False) -> dict:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        Returns:
            A dictionary containing the prepared InSAR job
        """
        if include_los_displacement:
            warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                          'include_displacement_maps, and will be removed in a future release.', FutureWarning)

        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_GAMMA',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def my_info(self) -> dict:
        """
        Returns:
            Your user information
        """
        response = self.session.get(urljoin(self.url, '/user'))
        _raise_for_hyp3_status(response)
        return response.json()

    def check_quota(self) -> Optional[int]:
        """
        Returns:
            The number of jobs left in your quota, or None if you have no quota
        """
        info = self.my_info()
        return info['quota']['remaining']

__init__(self, api_url='https://hyp3-api.asf.alaska.edu', username=None, password=None, prompt=False) special

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

'https://hyp3-api.asf.alaska.edu'
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in hyp3_sdk/hyp3.py
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """
    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

check_quota(self)

Returns:

Type Description
Optional[int]

The number of jobs left in your quota, or None if you have no quota

Source code in hyp3_sdk/hyp3.py
def check_quota(self) -> Optional[int]:
    """
    Returns:
        The number of jobs left in your quota, or None if you have no quota
    """
    info = self.my_info()
    return info['quota']['remaining']

find_jobs(self, start=None, end=None, status_code=None, name=None, job_type=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime.datetime]

only jobs submitted after given time

None
end Optional[datetime.datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
              status_code: Optional[str] = None, name: Optional[str] = None,
              job_type: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)

get_job_by_id(self, job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in hyp3_sdk/hyp3.py
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())

my_info(self)

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()

prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
dict

A dictionary containing the prepared InSAR job

Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'power']

Scale of output image; either power or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in hyp3_sdk/hyp3.py
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[30] = 30,
                    scale: Literal['amplitude', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; either power or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict

refresh(self, job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

submit_autorift_job(self, granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_insar_job(self, granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_prepared_jobs(self, prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in hyp3_sdk/hyp3.py
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch

submit_rtc_job(self, granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'power']

Scale of output image; either power or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[30] = 30,
                   scale: Literal['amplitude', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; either power or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

watch(self, job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

jobs

Batch

Source code in hyp3_sdk/jobs.py
class Batch:
    def __init__(self, jobs: Optional[List[Job]] = None):
        if jobs is None:
            jobs = []
        self.jobs = jobs

    def __add__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            return Batch(self.jobs + other.jobs)
        elif isinstance(other, Job):
            return Batch(self.jobs + [other])
        else:
            raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")

    def __iadd__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            self.jobs += other.jobs
        elif isinstance(other, Job):
            self.jobs += [other]
        else:
            raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
        return self

    def __iter__(self):
        return iter(self.jobs)

    def __len__(self):
        return len(self.jobs)

    def __contains__(self, job: Job):
        return job in self.jobs

    def __eq__(self, other: 'Batch'):
        return self.jobs == other.jobs

    def __delitem__(self, job: int):
        self.jobs.pop(job)
        return self

    def __getitem__(self, index: int):
        if isinstance(index, slice):
            return Batch(self.jobs[index])
        return self.jobs[index]

    def __setitem__(self, index: int, job: Job):
        self.jobs[index] = job
        return self

    def __repr__(self):
        reprs = ", ".join([job.__repr__() for job in self.jobs])
        return f'Batch([{reprs}])'

    def __str__(self):
        count = self._count_statuses()
        return f'{len(self)} HyP3 Jobs: ' \
               f'{count["SUCCEEDED"]} succeeded, ' \
               f'{count["FAILED"]} failed, ' \
               f'{count["RUNNING"]} running, ' \
               f'{count["PENDING"]} pending.'

    def _count_statuses(self):
        return Counter([job.status_code for job in self.jobs])

    def complete(self) -> bool:
        """
        Returns: True if all jobs are complete, otherwise returns False
        """
        for job in self.jobs:
            if not job.complete():
                return False
        return True

    def succeeded(self) -> bool:
        """
        Returns: True if all jobs have succeeded, otherwise returns False
        """
        for job in self.jobs:
            if not job.succeeded():
                return False
        return True

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        downloaded_files = []
        tqdm = get_tqdm_progress_bar()
        for job in tqdm(self.jobs):
            try:
                downloaded_files.extend(job.download_files(location, create))
            except HyP3SDKError as e:
                print(f'Warning: {e}. Skipping download for {job}.')
        return downloaded_files

    def any_expired(self) -> bool:
        """Check succeeded jobs for expiration"""
        for job in self.jobs:
            try:
                if job.expired():
                    return True
            except HyP3SDKError:
                continue
        return False

    def filter_jobs(
            self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
    ) -> 'Batch':
        """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

        Args:
            succeeded: Include all succeeded jobs
            running: Include all running jobs
            failed: Include all failed jobs
            include_expired: Include expired jobs in the result


        Returns:
             batch: A batch object containing jobs matching all the selected statuses
        """
        filtered_jobs = []

        for job in self.jobs:
            if job.succeeded() and succeeded:
                if include_expired or not job.expired():
                    filtered_jobs.append(job)

            elif job.running() and running:
                filtered_jobs.append(job)

            elif job.failed() and failed:
                filtered_jobs.append(job)

        return Batch(filtered_jobs)

any_expired(self)

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False

complete(self)

Returns: True if all jobs are complete, otherwise returns False

Source code in hyp3_sdk/jobs.py
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True

download_files(self, location='.', create=True)

Parameters:

Name Type Description Default
location Union[pathlib.Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e}. Skipping download for {job}.')
    return downloaded_files

filter_jobs(self, succeeded=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Type Description
batch

A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
def filter_jobs(
        self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)

succeeded(self)

Returns: True if all jobs have succeeded, otherwise returns False

Source code in hyp3_sdk/jobs.py
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

Job

Source code in hyp3_sdk/jobs.py
class Job:
    _attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}

    def __init__(
            self,
            job_type: str,
            job_id: str,
            request_time: datetime,
            status_code: str,
            user_id: str,
            name: Optional[str] = None,
            job_parameters: Optional[dict] = None,
            files: Optional[List] = None,
            logs: Optional[List] = None,
            browse_images: Optional[List] = None,
            thumbnail_images: Optional[List] = None,
            expiration_time: Optional[datetime] = None,
            processing_time_in_seconds: Optional[int] = None,
    ):
        self.job_id = job_id
        self.job_type = job_type
        self.request_time = request_time
        self.status_code = status_code
        self.user_id = user_id
        self.name = name
        self.job_parameters = job_parameters
        self.files = files
        self.logs = logs
        self.browse_images = browse_images
        self.thumbnail_images = thumbnail_images
        self.expiration_time = expiration_time
        self.processing_time_in_seconds = processing_time_in_seconds

    def __repr__(self):
        return f'Job.from_dict({self.to_dict()})'

    def __str__(self):
        return f'HyP3 {self.job_type} job {self.job_id}'

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    @staticmethod
    def from_dict(input_dict: dict):
        expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
        return Job(
            job_type=input_dict['job_type'],
            job_id=input_dict['job_id'],
            request_time=parse_date(input_dict['request_time']),
            status_code=input_dict['status_code'],
            user_id=input_dict['user_id'],
            name=input_dict.get('name'),
            job_parameters=input_dict.get('job_parameters'),
            files=input_dict.get('files'),
            logs=input_dict.get('logs'),
            browse_images=input_dict.get('browse_images'),
            thumbnail_images=input_dict.get('thumbnail_images'),
            expiration_time=expiration_time,
            processing_time_in_seconds=input_dict.get('processing_time_in_seconds'),
        )

    def to_dict(self, for_resubmit: bool = False):
        job_dict = {}
        if for_resubmit:
            keys_to_process = Job._attributes_for_resubmit
        else:
            keys_to_process = vars(self).keys()

        for key in keys_to_process:
            value = self.__getattribute__(key)
            if value is not None:
                if isinstance(value, datetime):
                    job_dict[key] = value.isoformat(timespec='seconds')
                else:
                    job_dict[key] = value

        return job_dict

    def succeeded(self) -> bool:
        return self.status_code == 'SUCCEEDED'

    def failed(self) -> bool:
        return self.status_code == 'FAILED'

    def complete(self) -> bool:
        return self.succeeded() or self.failed()

    # TODO may want to update this to check if status code is actually RUNNING, because currently this also returns
    #  true if status is PENDING
    def running(self) -> bool:
        return not self.complete()

    def expired(self) -> bool:
        return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        location = Path(location)

        if not self.succeeded():
            raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
        if self.expired():
            raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                               f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

        if create:
            location.mkdir(parents=True, exist_ok=True)
        elif not location.is_dir():
            raise NotADirectoryError(str(location))

        downloaded_files = []
        for file in self.files:
            download_url = file['url']
            filename = location / file['filename']
            try:
                downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
            except HTTPError:
                raise HyP3SDKError(f'Unable to download file: {download_url}')
        return downloaded_files

download_files(self, location='.', create=True)

Parameters:

Name Type Description Default
location Union[pathlib.Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

util

Extra utilities for working with HyP3

chunk(itr, n=200)

Split a sequence into small chunks

Parameters:

Name Type Description Default
itr Sequence[Any]

A sequence object to chunk

required
n int

Size of the chunks to return

200
Source code in hyp3_sdk/util.py
def chunk(itr: Sequence[Any], n: int = 200) -> Generator[Sequence[Any], None, None]:
    """Split a sequence into small chunks

    Args:
        itr: A sequence object to chunk
        n: Size of the chunks to return
    """
    if not isinstance(n, int) or n < 1:
        raise ValueError(f'n must be a positive integer: {n}')

    for i in range(0, len(itr), n):
        yield itr[i:i + n]

download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)

Download a file

Parameters:

Name Type Description Default
url str

URL of the file to download

required
filepath Union[pathlib.Path, str]

Location to place file into

required
chunk_size

Size to chunk the download into

None
retries

Number of retries to attempt

2
backoff_factor

Factor for calculating time between retries

1

Returns:

Type Description
download_path

The path to the downloaded file

Source code in hyp3_sdk/util.py
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
    """Download a file
    Args:
        url: URL of the file to download
        filepath: Location to place file into
        chunk_size: Size to chunk the download into
        retries: Number of retries to attempt
        backoff_factor: Factor for calculating time between retries
    Returns:
        download_path: The path to the downloaded file
    """
    filepath = Path(filepath)
    session = requests.Session()
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[429, 500, 502, 503, 504],
    )

    session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
    session.mount('http://', HTTPAdapter(max_retries=retry_strategy))
    stream = False if chunk_size is None else True
    with session.get(url, stream=stream) as s:
        s.raise_for_status()
        tqdm = get_tqdm_progress_bar()
        with tqdm.wrapattr(open(filepath, "wb"), 'write', miniters=1, desc=filepath.name,
                           total=int(s.headers.get('content-length', 0))) as f:
            for chunk in s.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    session.close()

    return filepath

extract_zipped_product(zip_file, delete=True)

Extract a zipped HyP3 product

Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally deleting zip file afterward.

Parameters:

Name Type Description Default
zip_file Union[str, pathlib.Path]

Zipped HyP3 product to extract

required
delete bool

Delete zip_file after it has been extracted

True

Returns:

Type Description
Path

Path to the HyP3 product folder containing the product files

Source code in hyp3_sdk/util.py
def extract_zipped_product(zip_file: Union[str, Path], delete: bool = True) -> Path:
    """Extract a zipped HyP3 product

    Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
    deleting `zip file` afterward.

    Args:
        zip_file: Zipped HyP3 product to extract
        delete: Delete `zip_file` after it has been extracted

    Returns:
        Path to the HyP3 product folder containing the product files
    """
    zip_file = Path(zip_file)
    with ZipFile(zip_file) as z:
        z.extractall(path=zip_file.parent)

    if delete:
        zip_file.unlink()

    return zip_file.parent / zip_file.stem

get_authenticated_session(username, password)

Log into HyP3 using credentials for urs.earthdata.nasa.gov from either the provided credentials or a .netrc file.

Returns:

Type Description
Session

An authenticated HyP3 Session

Source code in hyp3_sdk/util.py
def get_authenticated_session(username: str, password: str) -> requests.Session:
    """Log into HyP3 using credentials for `urs.earthdata.nasa.gov` from either the provided
     credentials or a `.netrc` file.

    Returns:
        An authenticated HyP3 Session
    """
    s = requests.Session()
    if hyp3_sdk.TESTING:
        return s
    if username is not None and password is not None:
        response = s.get(AUTH_URL, auth=(username, password))
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with credentials provided\n'
                                      'This could be due to invalid credentials or a connection error.')
    else:
        response = s.get(AUTH_URL)
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with .netrc file and no credentials provided\n'
                                      'This could be due to invalid credentials in .netrc or a connection error.')
    return s