Skip to content

HyP3 SDK

A python wrapper around the HyP3 API

>>> from hyp3_sdk import HyP3
>>> hyp3 = HyP3(username='MyUsername', password='MyPassword')  
>>> job = hyp3.submit_rtc_job(granule='S1A_IW_SLC__1SSV_20150621T120220_20150621T120232_006471_008934_72D8', name='MyNewJob')
>>> job = hyp3.watch(job)
>>> job.download_files()

Install

The HyP3 SDK can be installed via Anaconda/Miniconda:

conda install -c conda-forge hyp3_sdk

Or using pip:

python -m pip install hyp3_sdk

Quickstart

There are 3 main classes that the SDK exposes:

  • HyP3: to perform HyP3 operations (find jobs, refresh job information, submitting new jobs)
  • Job: to perform operations on single jobs (downloading products, check status)
  • Batch: to perform operations on multiple jobs at once (downloading products, check status)

An instance of the HyP3 class will be needed to interact with the external HyP3 API.

from hyp3_sdk import HyP3

# Must either have credentials for urs.earthdata.nasa.gov in a .netrc
hyp3 = HyP3()
# or provide them in the username and password keyword args
hyp3 = HyP3(username='MyUsername', password='MyPassword')

Submitting Jobs

hyp3 has member functions for submitting new jobs:

rtc_job = hyp3.submit_rtc_job('granule_id', 'job_name')
insar_job = hyp3.submit_insar_job('reference_granule_id', 'secondary_granule_id', 'job_name')
autorift_job = hyp3.submit_autorift_job('reference_granule_id', 'secondary_granule_id', 'job_name')
Each of these functions will return an instance of the Job class that represents a new HyP3 job request.

Finding Existing Jobs

To find HyP3 jobs that were run previously, you can use the hyp3.find_jobs()

batch = hyp3.find_jobs()
This will return a Batch instance representing all jobs owned by you. You can also pass parameters to query to a specific set of jobs

Operations on Job and Batch

If your jobs are not complete you can use the HyP3 instance to update them, and wait from completion

batch = hyp3.find_jobs()
if not batch.complete():
    # to get updated information
    batch = hyp3.refresh(batch)
    # or to wait until completion and get updated information (which will take a fair bit)
    batch = hyp3.watch(batch)

Once you have complete jobs you can download the products to your machine

batch.download_files()

These operations also work on Job objects

job = hyp3.submit_rtc_job('S1A_IW_SLC__1SSV_20150621T120220_20150621T120232_006471_008934_72D8', 'MyJobName')
job = hyp3.watch(job)
job.download_files()

SDK API Reference

A python wrapper around the HyP3 API

exceptions

Errors and exceptions to raise when the SDK runs into problems

AuthenticationError

Raise when authentication does not succeed

HyP3Error

Base Exception for Hyp3_sdk

ValidationError

Raise when jobs do not pass validation

hyp3

HyP3

A python wrapper around the HyP3 API

__init__(self, api_url='https://hyp3-api.asf.alaska.edu', username=None, password=None) special

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

'https://hyp3-api.asf.alaska.edu'
username Optional

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
Source code in hyp3_sdk/hyp3.py
def __init__(self, api_url: str = HYP3_PROD, username: Optional = None, password: Optional = None):
    """
    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to urs.earthdata.nasa.gov.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to urs.earthdata.nasa.gov.
           Both username and password must be provided if either is provided.
    """
    self.url = api_url
    self.session = get_authenticated_session(username, password)

check_quota(self)

Returns:

Type Description
int

The number of jobs left in your quota

Source code in hyp3_sdk/hyp3.py
def check_quota(self) -> int:
    """
    Returns:
        The number of jobs left in your quota
    """
    info = self.my_info()
    return info['quota']['remaining']

find_jobs(self, start=None, end=None, status=None, name=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime.datetime]

only jobs submitted after given time

None
end Optional[datetime.datetime]

only jobs submitted before given time

None
status Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
              status: Optional[str] = None, name: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    if name is not None:
        params['name'] = name
    if start is not None:
        params['start'] = start.isoformat(timespec='seconds')
        if start.tzinfo is None:
            params['start'] += 'Z'
    if end is not None:
        params['end'] = end.isoformat(timespec='seconds')
        if end.tzinfo is None:
            params['end'] += 'Z'
    if status is not None:
        params['status_code'] = status

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    try:
        response.raise_for_status()
    except HTTPError:
        raise HyP3Error(f'Error while trying to query {response.url}')
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]
    return Batch(jobs)

my_info(self)

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    try:
        response = self.session.get(urljoin(self.url, '/user'))
        response.raise_for_status()
    except HTTPError:
        raise HyP3Error('Unable to get user information from API')
    return response.json()

refresh(self, job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

obj: A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        obj: A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

submit_autorift_job(self, granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job (must be <= 20 characters)

None

Returns:

Type Description
Job

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Job:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job (must be <= 20 characters)

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    return self.submit_job_dict(job_dict=job_dict, name=name)

submit_insar_job(self, granule1, granule2, name=None, **kwargs)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job (must be <= 20 characters)

None
**kwargs

Extra job parameters specifying custom processing options

{}

Returns:

Type Description
Job

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
def submit_insar_job(self, granule1: str, granule2: str, name: Optional[str] = None, **kwargs) -> Job:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job (must be <= 20 characters)
        **kwargs: Extra job parameters specifying custom processing options

    Returns:
        A Batch object containing the InSAR job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **kwargs},
        'job_type': 'INSAR_GAMMA',
    }
    return self.submit_job_dict(job_dict=job_dict, name=name)

submit_rtc_job(self, granule, name=None, **kwargs)

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job (must be <= 20 characters)

None
**kwargs

Extra job parameters specifying custom processing options

{}

Returns:

Type Description
Job

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
def submit_rtc_job(self, granule: str, name: Optional[str] = None, **kwargs) -> Job:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job (must be <= 20 characters)
        **kwargs: Extra job parameters specifying custom processing options

    Returns:
        A Batch object containing the RTC job
    """
    job_dict = {
        'job_parameters': {'granules': [granule], **kwargs},
        'job_type': 'RTC_GAMMA',
    }
    return self.submit_job_dict(job_dict=job_dict, name=name)

watch(self, job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[hyp3_sdk.jobs.Batch, hyp3_sdk.jobs.Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800, interval: Union[int, float] = 60):
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    end_time = datetime.now() + timedelta(seconds=timeout)
    while datetime.now() < end_time:
        job_or_batch = self.refresh(job_or_batch)
        if job_or_batch.complete():
            return job_or_batch
        time.sleep(interval)
    raise HyP3Error('Timeout occurred while waiting for jobs')

jobs

Batch

any_expired(self)

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3Error:
            continue
    return False

complete(self)

Returns: True if all jobs are complete, otherwise returns False

Source code in hyp3_sdk/jobs.py
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True

download_files(self, location='')

Parameters:

Name Type Description Default
location Union[pathlib.Path, str]

Directory location to put files into

''

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '') -> List[Path]:
    """
    Args:
        location: Directory location to put files into

    Returns: list of Path objects to downloaded files
    """
    if not self.complete():
        raise HyP3Error('Incomplete jobs cannot be downloaded')
    downloaded_files = []
    for job in self.jobs:
        downloaded_files.extend(job.download_files(location))
    return downloaded_files

filter_jobs(self, succeeded=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Type Description
Batch

batch: A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
def filter_jobs(
        self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)

succeeded(self)

Returns: True if all jobs have succeeded, otherwise returns False

Source code in hyp3_sdk/jobs.py
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

Job

download_files(self, location='')

Parameters:

Name Type Description Default
location Union[pathlib.Path, str]

Directory location to put files into

''

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
def download_files(self, location: Union[Path, str] = '') -> List[Path]:
    """
    Args:
        location: Directory location to put files into

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)
    if not self.complete():
        raise HyP3Error('Incomplete jobs cannot be downloaded')
    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename))
        except Exception:
            raise HyP3Error('unable to download file')
    return downloaded_files

util

Extra utilities for working with HyP3

download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)

Download a file

Parameters:

Name Type Description Default
url str

URL of the file to download

required
filepath Union[pathlib.Path, str]

Location to place file into

required
chunk_size

Size to chunk the download into

None
retries

Number of retries to attempt

2
backoff_factor

Factor for calculating time between retries

1

Returns:

Type Description
Path

download_path: The path to the downloaded file

Source code in hyp3_sdk/util.py
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
    """Download a file
    Args:
        url: URL of the file to download
        filepath: Location to place file into
        chunk_size: Size to chunk the download into
        retries: Number of retries to attempt
        backoff_factor: Factor for calculating time between retries
    Returns:
        download_path: The path to the downloaded file
    """
    filepath = Path(filepath)
    session = requests.Session()
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[429, 500, 502, 503, 504],
    )

    session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
    session.mount('http://', HTTPAdapter(max_retries=retry_strategy))

    with session.get(url, stream=True) as s:
        s.raise_for_status()
        with open(filepath, "wb") as f:
            for chunk in s.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    session.close()

    return filepath

get_authenticated_session(username, password)

logs into hyp3 using credentials for urs.earthdata.nasa.gov from provided credentails or a .netrc file.

Returns:

Type Description
Session

An authenticated Session object from the requests library

Source code in hyp3_sdk/util.py
def get_authenticated_session(username: str, password: str) -> requests.Session:
    """logs into hyp3 using credentials for urs.earthdata.nasa.gov from provided credentails or a .netrc file.

    Returns:
        An authenticated Session object from the requests library
    """
    s = requests.Session()
    if hyp3_sdk.TESTING:
        return s
    if (username and password) is not None:
        try:
            response = s.get(AUTH_URL, auth=(username, password))
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with credentials provided\n'
                                      'This could be due to invalid credentials or a connection error.')
    else:
        try:
            response = s.get(AUTH_URL)
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with .netrc file and no credentials provided\n'
                                      'This could be due to invalid credentials in .netrc or a connection error.')
    return s