Skip to content

hyp3_sdk v1.6.1 API Reference

A python wrapper around the HyP3 API

Batch

Source code in hyp3_sdk/jobs.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
class Batch:
    def __init__(self, jobs: Optional[List[Job]] = None):
        if jobs is None:
            jobs = []
        self.jobs = jobs

    def __add__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            return Batch(self.jobs + other.jobs)
        elif isinstance(other, Job):
            return Batch(self.jobs + [other])
        else:
            raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")

    def __iadd__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            self.jobs += other.jobs
        elif isinstance(other, Job):
            self.jobs += [other]
        else:
            raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
        return self

    def __iter__(self):
        return iter(self.jobs)

    def __len__(self):
        return len(self.jobs)

    def __contains__(self, job: Job):
        return job in self.jobs

    def __eq__(self, other: 'Batch'):
        return self.jobs == other.jobs

    def __delitem__(self, job: int):
        self.jobs.pop(job)
        return self

    def __getitem__(self, index: int):
        if isinstance(index, slice):
            return Batch(self.jobs[index])
        return self.jobs[index]

    def __setitem__(self, index: int, job: Job):
        self.jobs[index] = job
        return self

    def __repr__(self):
        reprs = ", ".join([job.__repr__() for job in self.jobs])
        return f'Batch([{reprs}])'

    def __str__(self):
        count = self._count_statuses()
        return f'{len(self)} HyP3 Jobs: ' \
               f'{count["SUCCEEDED"]} succeeded, ' \
               f'{count["FAILED"]} failed, ' \
               f'{count["RUNNING"]} running, ' \
               f'{count["PENDING"]} pending.'

    def _count_statuses(self):
        return Counter([job.status_code for job in self.jobs])

    def complete(self) -> bool:
        """
        Returns: True if all jobs are complete, otherwise returns False
        """
        for job in self.jobs:
            if not job.complete():
                return False
        return True

    def succeeded(self) -> bool:
        """
        Returns: True if all jobs have succeeded, otherwise returns False
        """
        for job in self.jobs:
            if not job.succeeded():
                return False
        return True

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        downloaded_files = []
        tqdm = get_tqdm_progress_bar()
        for job in tqdm(self.jobs):
            try:
                downloaded_files.extend(job.download_files(location, create))
            except HyP3SDKError as e:
                print(f'Warning: {e}. Skipping download for {job}.')
        return downloaded_files

    def any_expired(self) -> bool:
        """Check succeeded jobs for expiration"""
        for job in self.jobs:
            try:
                if job.expired():
                    return True
            except HyP3SDKError:
                continue
        return False

    def filter_jobs(
            self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
    ) -> 'Batch':
        """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

        Args:
            succeeded: Include all succeeded jobs
            running: Include all running jobs
            failed: Include all failed jobs
            include_expired: Include expired jobs in the result


        Returns:
             batch: A batch object containing jobs matching all the selected statuses
        """
        filtered_jobs = []

        for job in self.jobs:
            if job.succeeded() and succeeded:
                if include_expired or not job.expired():
                    filtered_jobs.append(job)

            elif job.running() and running:
                filtered_jobs.append(job)

            elif job.failed() and failed:
                filtered_jobs.append(job)

        return Batch(filtered_jobs)

any_expired()

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
244
245
246
247
248
249
250
251
252
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False

complete()

Source code in hyp3_sdk/jobs.py
209
210
211
212
213
214
215
216
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True

download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True
Source code in hyp3_sdk/jobs.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e}. Skipping download for {job}.')
    return downloaded_files

filter_jobs(succeeded=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Name Type Description
batch Batch

A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def filter_jobs(
        self, succeeded: bool = True, running: bool = True, failed: bool = False, include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)

succeeded()

Source code in hyp3_sdk/jobs.py
218
219
220
221
222
223
224
225
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

HyP3

A python wrapper around the HyP3 API

Source code in hyp3_sdk/hyp3.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
class HyP3:
    """A python wrapper around the HyP3 API"""

    def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
                 prompt: bool = False):
        """
        Args:
            api_url: Address of the HyP3 API
            username: Username for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            password: Password for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            prompt: Prompt for username and/or password interactively when they
                are not provided as keyword parameters
        """
        self.url = api_url

        if username is None and prompt:
            username = input('NASA Earthdata Login username: ')
        if password is None and prompt:
            password = getpass('NASA Earthdata Login password: ')

        self.session = get_authenticated_session(username, password)
        self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

    def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
                  status_code: Optional[str] = None, name: Optional[str] = None,
                  job_type: Optional[str] = None) -> Batch:
        """Gets a Batch of jobs from HyP3 matching the provided search criteria

        Args:
            start: only jobs submitted after given time
            end: only jobs submitted before given time
            status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
            name: only jobs with this name
            job_type: only jobs with this job_type

        Returns:
            A Batch object containing the found jobs
        """
        params = {}
        for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
            param_value = locals().get(param_name)
            if param_value is not None:
                if isinstance(param_value, datetime):
                    if param_value.tzinfo is None:
                        param_value = param_value.replace(tzinfo=timezone.utc)
                    param_value = param_value.isoformat(timespec='seconds')

                params[param_name] = param_value

        response = self.session.get(urljoin(self.url, '/jobs'), params=params)
        _raise_for_hyp3_status(response)
        jobs = [Job.from_dict(job) for job in response.json()['jobs']]

        while 'next' in response.json():
            next_url = response.json()['next']
            response = self.session.get(next_url)
            _raise_for_hyp3_status(response)
            jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

        return Batch(jobs)

    def get_job_by_id(self, job_id: str) -> Job:
        """Get job by job ID

        Args:
            job_id: A job ID

        Returns:
            A Job object
        """
        response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
        _raise_for_hyp3_status(response)

        return Job.from_dict(response.json())

    @singledispatchmethod
    def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
              interval: Union[int, float] = 60) -> Union[Batch, Job]:
        """Watch jobs until they complete

        Args:
            job_or_batch: A Batch or Job object of jobs to watch
            timeout: How long to wait until exiting in seconds
            interval: How often to check for updates in seconds

        Returns:
            A Batch or Job object with refreshed watched jobs
        """
        raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

    @watch.register
    def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
        tqdm = get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                batch = self.refresh(batch)

                counts = batch._count_statuses()
                complete = counts['SUCCEEDED'] + counts['FAILED']

                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                # to control n/total manually; update is n += value
                progress_bar.n = complete
                progress_bar.update(0)

                if batch.complete():
                    return batch
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {batch}')

    @watch.register
    def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
        tqdm = get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                job = self.refresh(job)
                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                progress_bar.update(int(job.complete()))

                if job.complete():
                    return job
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {job}')

    @singledispatchmethod
    def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
        """Refresh each jobs' information

        Args:
            job_or_batch: A Batch of Job object to refresh

        Returns:
            A Batch or Job object with refreshed information
        """
        raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

    @refresh.register
    def _refresh_batch(self, batch: Batch):
        jobs = []
        for job in batch.jobs:
            jobs.append(self.refresh(job))
        return Batch(jobs)

    @refresh.register
    def _refresh_job(self, job: Job):
        return self.get_job_by_id(job.job_id)

    def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
        """Submit a prepared job dictionary, or list of prepared job dictionaries

        Args:
            prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

        Returns:
            A Batch object containing the submitted job(s)
        """
        if isinstance(prepared_jobs, dict):
            payload = {'jobs': [prepared_jobs]}
        else:
            payload = {'jobs': prepared_jobs}

        response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
        _raise_for_hyp3_status(response)

        batch = Batch()
        for job in response.json()['jobs']:
            batch += Job.from_dict(job)
        return batch

    def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A Batch object containing the autoRIFT job
        """
        job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A dictionary containing the prepared autoRIFT job
        """
        job_dict = {
            'job_parameters': {'granules': [granule1, granule2]},
            'job_type': 'AUTORIFT',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_rtc_job(self,
                       granule: str,
                       name: Optional[str] = None,
                       dem_matching: bool = False,
                       include_dem: bool = False,
                       include_inc_map: bool = False,
                       include_rgb: bool = False,
                       include_scattering_area: bool = False,
                       radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                       resolution: Literal[30] = 30,
                       scale: Literal['amplitude', 'power'] = 'power',
                       speckle_filter: bool = False,
                       dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; either power or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A Batch object containing the RTC job
        """
        arguments = locals()
        arguments.pop('self')
        job_dict = self.prepare_rtc_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_rtc_job(cls,
                        granule: str,
                        name: Optional[str] = None,
                        dem_matching: bool = False,
                        include_dem: bool = False,
                        include_inc_map: bool = False,
                        include_rgb: bool = False,
                        include_scattering_area: bool = False,
                        radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                        resolution: Literal[30] = 30,
                        scale: Literal['amplitude', 'power'] = 'power',
                        speckle_filter: bool = False,
                        dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; either power or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
                while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

        Returns:
            A dictionary containing the prepared RTC job
        """
        job_parameters = locals().copy()
        for key in ['granule', 'name', 'cls']:
            job_parameters.pop(key, None)

        job_dict = {
            'job_parameters': {'granules': [granule], **job_parameters},
            'job_type': 'RTC_GAMMA',
        }

        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_job(self,
                         granule1: str,
                         granule2: str,
                         name: Optional[str] = None,
                         include_look_vectors: bool = False,
                         include_los_displacement: bool = False,
                         include_inc_map: bool = False,
                         looks: Literal['20x4', '10x2'] = '20x4',
                         include_dem: bool = False,
                         include_wrapped_phase: bool = False,
                         apply_water_mask: bool = False,
                         include_displacement_maps: bool = False) -> Batch:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

        Returns:
            A Batch object containing the InSAR job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_job(cls,
                          granule1: str,
                          granule2: str,
                          name: Optional[str] = None,
                          include_look_vectors: bool = False,
                          include_los_displacement: bool = False,
                          include_inc_map: bool = False,
                          looks: Literal['20x4', '10x2'] = '20x4',
                          include_dem: bool = False,
                          include_wrapped_phase: bool = False,
                          apply_water_mask: bool = False,
                          include_displacement_maps: bool = False) -> dict:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        Returns:
            A dictionary containing the prepared InSAR job
        """
        if include_los_displacement:
            warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                          'include_displacement_maps, and will be removed in a future release.', FutureWarning)

        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_GAMMA',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def my_info(self) -> dict:
        """
        Returns:
            Your user information
        """
        response = self.session.get(urljoin(self.url, '/user'))
        _raise_for_hyp3_status(response)
        return response.json()

    def check_quota(self) -> Optional[int]:
        """
        Returns:
            The number of jobs left in your quota, or None if you have no quota
        """
        info = self.my_info()
        return info['quota']['remaining']

__init__(api_url=PROD_API, username=None, password=None, prompt=False)

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

PROD_API
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in hyp3_sdk/hyp3.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """
    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

check_quota()

Returns:

Type Description
Optional[int]

The number of jobs left in your quota, or None if you have no quota

Source code in hyp3_sdk/hyp3.py
413
414
415
416
417
418
419
def check_quota(self) -> Optional[int]:
    """
    Returns:
        The number of jobs left in your quota, or None if you have no quota
    """
    info = self.my_info()
    return info['quota']['remaining']

find_jobs(start=None, end=None, status_code=None, name=None, job_type=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime]

only jobs submitted after given time

None
end Optional[datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def find_jobs(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
              status_code: Optional[str] = None, name: Optional[str] = None,
              job_type: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)

get_job_by_id(job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in hyp3_sdk/hyp3.py
82
83
84
85
86
87
88
89
90
91
92
93
94
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())

my_info()

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
404
405
406
407
408
409
410
411
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()

prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in hyp3_sdk/hyp3.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
dict

A dictionary containing the prepared InSAR job

Source code in hyp3_sdk/hyp3.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'power']

Scale of output image; either power or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in hyp3_sdk/hyp3.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[30] = 30,
                    scale: Literal['amplitude', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; either power or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict

refresh(job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
149
150
151
152
153
154
155
156
157
158
159
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

submit_autorift_job(granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
194
195
196
197
198
199
200
201
202
203
204
205
206
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_prepared_jobs(prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in hyp3_sdk/hyp3.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch

submit_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'power']

Scale of output image; either power or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus', 'legacy']

Name of the DEM to use for processing. copernicus will use the Copernicus GLO-30 Public DEM, while legacy will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[30] = 30,
                   scale: Literal['amplitude', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus', 'legacy'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; either power or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing.  `copernicus` will use the Copernicus GLO-30 Public DEM,
            while `legacy` will use the DEM with the best coverage from ASF's legacy SRTM/NED datasets.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

watch(job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

Job

Source code in hyp3_sdk/jobs.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class Job:
    _attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}

    def __init__(
            self,
            job_type: str,
            job_id: str,
            request_time: datetime,
            status_code: str,
            user_id: str,
            subscription_id: Optional[str] = None,
            name: Optional[str] = None,
            job_parameters: Optional[dict] = None,
            files: Optional[List] = None,
            logs: Optional[List] = None,
            browse_images: Optional[List] = None,
            thumbnail_images: Optional[List] = None,
            expiration_time: Optional[datetime] = None,
            processing_time_in_seconds: Optional[int] = None,
    ):
        self.job_id = job_id
        self.job_type = job_type
        self.request_time = request_time
        self.status_code = status_code
        self.user_id = user_id
        self.subscription_id = subscription_id
        self.name = name
        self.job_parameters = job_parameters
        self.files = files
        self.logs = logs
        self.browse_images = browse_images
        self.thumbnail_images = thumbnail_images
        self.expiration_time = expiration_time
        self.processing_time_in_seconds = processing_time_in_seconds

    def __repr__(self):
        return f'Job.from_dict({self.to_dict()})'

    def __str__(self):
        return f'HyP3 {self.job_type} job {self.job_id}'

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    @staticmethod
    def from_dict(input_dict: dict):
        expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
        return Job(
            job_type=input_dict['job_type'],
            job_id=input_dict['job_id'],
            request_time=parse_date(input_dict['request_time']),
            status_code=input_dict['status_code'],
            user_id=input_dict['user_id'],
            subscription_id=input_dict.get('subscription_id'),
            name=input_dict.get('name'),
            job_parameters=input_dict.get('job_parameters'),
            files=input_dict.get('files'),
            logs=input_dict.get('logs'),
            browse_images=input_dict.get('browse_images'),
            thumbnail_images=input_dict.get('thumbnail_images'),
            expiration_time=expiration_time,
            processing_time_in_seconds=input_dict.get('processing_time_in_seconds'),
        )

    def to_dict(self, for_resubmit: bool = False):
        job_dict = {}
        if for_resubmit:
            keys_to_process = Job._attributes_for_resubmit
        else:
            keys_to_process = vars(self).keys()

        for key in keys_to_process:
            value = self.__getattribute__(key)
            if value is not None:
                if isinstance(value, datetime):
                    job_dict[key] = value.isoformat(timespec='seconds')
                else:
                    job_dict[key] = value

        return job_dict

    def succeeded(self) -> bool:
        return self.status_code == 'SUCCEEDED'

    def failed(self) -> bool:
        return self.status_code == 'FAILED'

    def complete(self) -> bool:
        return self.succeeded() or self.failed()

    # TODO may want to update this to check if status code is actually RUNNING, because currently this also returns
    #  true if status is PENDING
    def running(self) -> bool:
        return not self.complete()

    def expired(self) -> bool:
        return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        location = Path(location)

        if not self.succeeded():
            raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
        if self.expired():
            raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                               f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

        if create:
            location.mkdir(parents=True, exist_ok=True)
        elif not location.is_dir():
            raise NotADirectoryError(str(location))

        downloaded_files = []
        for file in self.files:
            download_url = file['url']
            filename = location / file['filename']
            try:
                downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
            except HTTPError:
                raise HyP3SDKError(f'Unable to download file: {download_url}')
        return downloaded_files

download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True
Source code in hyp3_sdk/jobs.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

util

Extra utilities for working with HyP3

chunk(itr, n=200)

Split a sequence into small chunks

Parameters:

Name Type Description Default
itr Sequence[Any]

A sequence object to chunk

required
n int

Size of the chunks to return

200
Source code in hyp3_sdk/util.py
40
41
42
43
44
45
46
47
48
49
50
51
def chunk(itr: Sequence[Any], n: int = 200) -> Generator[Sequence[Any], None, None]:
    """Split a sequence into small chunks

    Args:
        itr: A sequence object to chunk
        n: Size of the chunks to return
    """
    if not isinstance(n, int) or n < 1:
        raise ValueError(f'n must be a positive integer: {n}')

    for i in range(0, len(itr), n):
        yield itr[i:i + n]

download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)

Download a file

Parameters:

Name Type Description Default
url str

URL of the file to download

required
filepath Union[Path, str]

Location to place file into

required
chunk_size

Size to chunk the download into

None
retries

Number of retries to attempt

2
backoff_factor

Factor for calculating time between retries

1

Returns:

Name Type Description
download_path Path

The path to the downloaded file

Source code in hyp3_sdk/util.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
    """Download a file
    Args:
        url: URL of the file to download
        filepath: Location to place file into
        chunk_size: Size to chunk the download into
        retries: Number of retries to attempt
        backoff_factor: Factor for calculating time between retries
    Returns:
        download_path: The path to the downloaded file
    """
    filepath = Path(filepath)
    session = requests.Session()
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[429, 500, 502, 503, 504],
    )

    session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
    session.mount('http://', HTTPAdapter(max_retries=retry_strategy))
    stream = False if chunk_size is None else True
    with session.get(url, stream=stream) as s:
        s.raise_for_status()
        tqdm = get_tqdm_progress_bar()
        with tqdm.wrapattr(open(filepath, "wb"), 'write', miniters=1, desc=filepath.name,
                           total=int(s.headers.get('content-length', 0))) as f:
            for chunk in s.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    session.close()

    return filepath

extract_zipped_product(zip_file, delete=True)

Extract a zipped HyP3 product

Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally deleting zip file afterward.

Parameters:

Name Type Description Default
zip_file Union[str, Path]

Zipped HyP3 product to extract

required
delete bool

Delete zip_file after it has been extracted

True

Returns:

Type Description
Path

Path to the HyP3 product folder containing the product files

Source code in hyp3_sdk/util.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def extract_zipped_product(zip_file: Union[str, Path], delete: bool = True) -> Path:
    """Extract a zipped HyP3 product

    Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
    deleting `zip file` afterward.

    Args:
        zip_file: Zipped HyP3 product to extract
        delete: Delete `zip_file` after it has been extracted

    Returns:
        Path to the HyP3 product folder containing the product files
    """
    zip_file = Path(zip_file)
    with ZipFile(zip_file) as z:
        z.extractall(path=zip_file.parent)

    if delete:
        zip_file.unlink()

    return zip_file.parent / zip_file.stem

get_authenticated_session(username, password)

Log into HyP3 using credentials for urs.earthdata.nasa.gov from either the provided credentials or a .netrc file.

Returns:

Type Description
requests.Session

An authenticated HyP3 Session

Source code in hyp3_sdk/util.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_authenticated_session(username: str, password: str) -> requests.Session:
    """Log into HyP3 using credentials for `urs.earthdata.nasa.gov` from either the provided
     credentials or a `.netrc` file.

    Returns:
        An authenticated HyP3 Session
    """
    s = requests.Session()
    if hyp3_sdk.TESTING:
        return s
    if username is not None and password is not None:
        response = s.get(AUTH_URL, auth=(username, password))
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with credentials provided\n'
                                      'This could be due to invalid credentials or a connection error.')
    else:
        response = s.get(AUTH_URL)
        try:
            response.raise_for_status()
        except requests.HTTPError:
            raise AuthenticationError('Was not able to authenticate with .netrc file and no credentials provided\n'
                                      'This could be due to invalid credentials in .netrc or a connection error.')
    return s