Skip to content

hyp3_sdk v6.1.0 API Reference

hyp3_sdk

A python wrapper around the HyP3 API

Batch

Source code in hyp3_sdk/jobs.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class Batch:
    def __init__(self, jobs: Optional[List[Job]] = None):
        if jobs is None:
            jobs = []
        self.jobs = jobs

    def __add__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            return Batch(self.jobs + other.jobs)
        elif isinstance(other, Job):
            return Batch(self.jobs + [other])
        else:
            raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")

    def __iadd__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            self.jobs += other.jobs
        elif isinstance(other, Job):
            self.jobs += [other]
        else:
            raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
        return self

    def __iter__(self):
        return iter(self.jobs)

    def __len__(self):
        return len(self.jobs)

    def __contains__(self, job: Job):
        return job in self.jobs

    def __eq__(self, other: 'Batch'):
        return self.jobs == other.jobs

    def __delitem__(self, job: int):
        self.jobs.pop(job)
        return self

    def __getitem__(self, index: int):
        if isinstance(index, slice):
            return Batch(self.jobs[index])
        return self.jobs[index]

    def __setitem__(self, index: int, job: Job):
        self.jobs[index] = job
        return self

    def __repr__(self):
        reprs = ", ".join([job.__repr__() for job in self.jobs])
        return f'Batch([{reprs}])'

    def __str__(self):
        count = self._count_statuses()
        return f'{len(self)} HyP3 Jobs: ' \
               f'{count["SUCCEEDED"]} succeeded, ' \
               f'{count["FAILED"]} failed, ' \
               f'{count["RUNNING"]} running, ' \
               f'{count["PENDING"]} pending.'

    def _count_statuses(self):
        return Counter([job.status_code for job in self.jobs])

    def complete(self) -> bool:
        """
        Returns: True if all jobs are complete, otherwise returns False
        """
        for job in self.jobs:
            if not job.complete():
                return False
        return True

    def succeeded(self) -> bool:
        """
        Returns: True if all jobs have succeeded, otherwise returns False
        """
        for job in self.jobs:
            if not job.succeeded():
                return False
        return True

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        downloaded_files = []
        tqdm = get_tqdm_progress_bar()
        for job in tqdm(self.jobs):
            try:
                downloaded_files.extend(job.download_files(location, create))
            except HyP3SDKError as e:
                print(f'Warning: {e} Skipping download for {job}.')
        return downloaded_files

    def any_expired(self) -> bool:
        """Check succeeded jobs for expiration"""
        for job in self.jobs:
            try:
                if job.expired():
                    return True
            except HyP3SDKError:
                continue
        return False

    def filter_jobs(
        self,
        succeeded: bool = True,
        pending: bool = True,
        running: bool = True,
        failed: bool = False,
        include_expired: bool = True,
    ) -> 'Batch':
        """Filter jobs by status. By default, only succeeded, pending,
        and still running jobs will be in the returned batch.

        Args:
            succeeded: Include all succeeded jobs
            pending: Include all pending jobs
            running: Include all running jobs
            failed: Include all failed jobs
            include_expired: Include expired jobs in the result


        Returns:
             batch: A batch object containing jobs matching all the selected statuses
        """
        filtered_jobs = []

        for job in self.jobs:
            if job.succeeded() and succeeded:
                if include_expired or not job.expired():
                    filtered_jobs.append(job)

            elif job.running() and running:
                filtered_jobs.append(job)

            elif job.pending() and pending:
                filtered_jobs.append(job)

            elif job.failed() and failed:
                filtered_jobs.append(job)

        return Batch(filtered_jobs)

    def total_credit_cost(self):
        return sum(job.credit_cost for job in self.jobs if job.credit_cost is not None)

any_expired()

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
245
246
247
248
249
250
251
252
253
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False

complete()

Returns: True if all jobs are complete, otherwise returns False

Source code in hyp3_sdk/jobs.py
210
211
212
213
214
215
216
217
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True

download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e} Skipping download for {job}.')
    return downloaded_files

filter_jobs(succeeded=True, pending=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded, pending, and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
pending bool

Include all pending jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Name Type Description
batch Batch

A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def filter_jobs(
    self,
    succeeded: bool = True,
    pending: bool = True,
    running: bool = True,
    failed: bool = False,
    include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded, pending,
    and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        pending: Include all pending jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.pending() and pending:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)

succeeded()

Returns: True if all jobs have succeeded, otherwise returns False

Source code in hyp3_sdk/jobs.py
219
220
221
222
223
224
225
226
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

HyP3

A python wrapper around the HyP3 API.

Warning: All jobs submitted to HyP3 are publicly visible. For more information, see https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs

Source code in hyp3_sdk/hyp3.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
class HyP3:
    """A python wrapper around the HyP3 API.

    Warning: All jobs submitted to HyP3 are publicly visible. For more information, see
    https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs
    """

    def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
                 prompt: bool = False):
        """If username and password are not provided, attempts to use credentials from a `.netrc` file.

        Args:
            api_url: Address of the HyP3 API
            username: Username for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            password: Password for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            prompt: Prompt for username and/or password interactively when they
                are not provided as keyword parameters
        """
        self.url = api_url

        if username is None and prompt:
            username = input('NASA Earthdata Login username: ')
        if password is None and prompt:
            password = getpass('NASA Earthdata Login password: ')

        self.session = hyp3_sdk.util.get_authenticated_session(username, password)
        self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

    def find_jobs(self,
                  start: Optional[datetime] = None,
                  end: Optional[datetime] = None,
                  status_code: Optional[str] = None,
                  name: Optional[str] = None,
                  job_type: Optional[str] = None,
                  user_id: Optional[str] = None) -> Batch:
        """Gets a Batch of jobs from HyP3 matching the provided search criteria

        Args:
            start: only jobs submitted after given time
            end: only jobs submitted before given time
            status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
            name: only jobs with this name
            job_type: only jobs with this job_type
            user_id: only jobs submitted by this user (defaults to the current user)

        Returns:
            A Batch object containing the found jobs
        """
        params = {}
        for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
            param_value = locals().get(param_name)
            if param_value is not None:
                if isinstance(param_value, datetime):
                    if param_value.tzinfo is None:
                        param_value = param_value.replace(tzinfo=timezone.utc)
                    param_value = param_value.isoformat(timespec='seconds')

                params[param_name] = param_value

        response = self.session.get(urljoin(self.url, '/jobs'), params=params)
        _raise_for_hyp3_status(response)
        jobs = [Job.from_dict(job) for job in response.json()['jobs']]

        while 'next' in response.json():
            next_url = response.json()['next']
            response = self.session.get(next_url)
            _raise_for_hyp3_status(response)
            jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

        return Batch(jobs)

    def get_job_by_id(self, job_id: str) -> Job:
        """Get job by job ID

        Args:
            job_id: A job ID

        Returns:
            A Job object
        """
        response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
        _raise_for_hyp3_status(response)

        return Job.from_dict(response.json())

    @singledispatchmethod
    def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
              interval: Union[int, float] = 60) -> Union[Batch, Job]:
        """Watch jobs until they complete

        Args:
            job_or_batch: A Batch or Job object of jobs to watch
            timeout: How long to wait until exiting in seconds
            interval: How often to check for updates in seconds

        Returns:
            A Batch or Job object with refreshed watched jobs
        """
        raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

    @watch.register
    def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                batch = self.refresh(batch)

                counts = batch._count_statuses()
                complete = counts['SUCCEEDED'] + counts['FAILED']

                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                # to control n/total manually; update is n += value
                progress_bar.n = complete
                progress_bar.update(0)

                if batch.complete():
                    return batch
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {batch}')

    @watch.register
    def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                job = self.refresh(job)
                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                progress_bar.update(int(job.complete()))

                if job.complete():
                    return job
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {job}')

    @singledispatchmethod
    def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
        """Refresh each jobs' information

        Args:
            job_or_batch: A Batch of Job object to refresh

        Returns:
            A Batch or Job object with refreshed information
        """
        raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

    @refresh.register
    def _refresh_batch(self, batch: Batch):
        jobs = []
        for job in batch.jobs:
            jobs.append(self.refresh(job))
        return Batch(jobs)

    @refresh.register
    def _refresh_job(self, job: Job):
        return self.get_job_by_id(job.job_id)

    def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
        """Submit a prepared job dictionary, or list of prepared job dictionaries

        Args:
            prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

        Returns:
            A Batch object containing the submitted job(s)
        """
        if isinstance(prepared_jobs, dict):
            payload = {'jobs': [prepared_jobs]}
        else:
            payload = {'jobs': prepared_jobs}

        response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
        _raise_for_hyp3_status(response)

        batch = Batch()
        for job in response.json()['jobs']:
            batch += Job.from_dict(job)
        return batch

    def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A Batch object containing the autoRIFT job
        """
        job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A dictionary containing the prepared autoRIFT job
        """
        job_dict = {
            'job_parameters': {'granules': [granule1, granule2]},
            'job_type': 'AUTORIFT',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_rtc_job(self,
                       granule: str,
                       name: Optional[str] = None,
                       dem_matching: bool = False,
                       include_dem: bool = False,
                       include_inc_map: bool = False,
                       include_rgb: bool = False,
                       include_scattering_area: bool = False,
                       radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                       resolution: Literal[10, 20, 30] = 30,
                       scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                       speckle_filter: bool = False,
                       dem_name: Literal['copernicus'] = 'copernicus') -> Batch:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
            the Copernicus GLO-30 Public DEM.

        Returns:
            A Batch object containing the RTC job
        """
        arguments = locals()
        arguments.pop('self')
        job_dict = self.prepare_rtc_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_rtc_job(cls,
                        granule: str,
                        name: Optional[str] = None,
                        dem_matching: bool = False,
                        include_dem: bool = False,
                        include_inc_map: bool = False,
                        include_rgb: bool = False,
                        include_scattering_area: bool = False,
                        radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                        resolution: Literal[10, 20, 30] = 30,
                        scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                        speckle_filter: bool = False,
                        dem_name: Literal['copernicus'] = 'copernicus') -> dict:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
            the Copernicus GLO-30 Public DEM.

        Returns:
            A dictionary containing the prepared RTC job
        """
        job_parameters = locals().copy()
        for key in ['granule', 'name', 'cls']:
            job_parameters.pop(key, None)

        job_dict = {
            'job_parameters': {'granules': [granule], **job_parameters},
            'job_type': 'RTC_GAMMA',
        }

        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_job(self,
                         granule1: str,
                         granule2: str,
                         name: Optional[str] = None,
                         include_look_vectors: bool = False,
                         include_los_displacement: bool = False,
                         include_inc_map: bool = False,
                         looks: Literal['20x4', '10x2'] = '20x4',
                         include_dem: bool = False,
                         include_wrapped_phase: bool = False,
                         apply_water_mask: bool = False,
                         include_displacement_maps: bool = False,
                         phase_filter_parameter: float = 0.6) -> Batch:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
            phase_filter_parameter: Adaptive phase filter parameter.
                Useful values fall in the range 0.2 to 1.
                Larger values result in stronger filtering.
                If zero, adaptive phase filter will be skipped.

        Returns:
            A Batch object containing the InSAR job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_job(cls,
                          granule1: str,
                          granule2: str,
                          name: Optional[str] = None,
                          include_look_vectors: bool = False,
                          include_los_displacement: bool = False,
                          include_inc_map: bool = False,
                          looks: Literal['20x4', '10x2'] = '20x4',
                          include_dem: bool = False,
                          include_wrapped_phase: bool = False,
                          apply_water_mask: bool = False,
                          include_displacement_maps: bool = False,
                          phase_filter_parameter: float = 0.6) -> dict:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
            phase_filter_parameter: Adaptive phase filter parameter.
                Useful values fall in the range 0.2 to 1.
                Larger values result in stronger filtering.
                If zero, adaptive phase filter will be skipped.

        Returns:
            A dictionary containing the prepared InSAR job
        """
        if include_los_displacement:
            warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                          'include_displacement_maps, and will be removed in a future release.', FutureWarning)

        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_GAMMA',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_isce_burst_job(self,
                                    granule1: str,
                                    granule2: str,
                                    name: Optional[str] = None,
                                    apply_water_mask: bool = False,
                                    looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> Batch:
        """Submit an InSAR ISCE burst job.

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            looks: Number of looks to take in range and azimuth

        Returns:
            A Batch object containing the InSAR ISCE burst job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_isce_burst_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_isce_burst_job(cls,
                                     granule1: str,
                                     granule2: str,
                                     name: Optional[str] = None,
                                     apply_water_mask: bool = False,
                                     looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> dict:
        """Prepare an InSAR ISCE burst job.

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            looks: Number of looks to take in range and azimuth

        Returns:
            A dictionary containing the prepared InSAR ISCE burst job
        """
        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_ISCE_BURST',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def my_info(self) -> dict:
        """
        Returns:
            Your user information
        """
        response = self.session.get(urljoin(self.url, '/user'))
        _raise_for_hyp3_status(response)
        return response.json()

    def check_credits(self) -> Union[float, int, None]:
        """
        Returns:
            Your remaining processing credits, or None if you have no processing limit
        """
        info = self.my_info()
        return info['remaining_credits']

    def check_quota(self) -> Union[float, int, None]:
        """Deprecated method for checking your remaining processing credits; replaced by `HyP3.check_credits`

        Returns:
            Your remaining processing credits, or None if you have no processing limit
        """
        warn('This method is deprecated and will be removed in a future release.\n'
             'Please use `HyP3.check_credits` instead.', DeprecationWarning, stacklevel=2)
        return self.check_credits()

    def costs(self) -> dict:
        """
        Returns:
            Table of job costs
        """
        response = self.session.get(urljoin(self.url, '/costs'))
        _raise_for_hyp3_status(response)
        return response.json()

__init__(api_url=PROD_API, username=None, password=None, prompt=False)

If username and password are not provided, attempts to use credentials from a .netrc file.

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

PROD_API
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in hyp3_sdk/hyp3.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """If username and password are not provided, attempts to use credentials from a `.netrc` file.

    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = hyp3_sdk.util.get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

check_credits()

Returns:

Type Description
Union[float, int, None]

Your remaining processing credits, or None if you have no processing limit

Source code in hyp3_sdk/hyp3.py
491
492
493
494
495
496
497
def check_credits(self) -> Union[float, int, None]:
    """
    Returns:
        Your remaining processing credits, or None if you have no processing limit
    """
    info = self.my_info()
    return info['remaining_credits']

check_quota()

Deprecated method for checking your remaining processing credits; replaced by HyP3.check_credits

Returns:

Type Description
Union[float, int, None]

Your remaining processing credits, or None if you have no processing limit

Source code in hyp3_sdk/hyp3.py
499
500
501
502
503
504
505
506
507
def check_quota(self) -> Union[float, int, None]:
    """Deprecated method for checking your remaining processing credits; replaced by `HyP3.check_credits`

    Returns:
        Your remaining processing credits, or None if you have no processing limit
    """
    warn('This method is deprecated and will be removed in a future release.\n'
         'Please use `HyP3.check_credits` instead.', DeprecationWarning, stacklevel=2)
    return self.check_credits()

costs()

Returns:

Type Description
dict

Table of job costs

Source code in hyp3_sdk/hyp3.py
509
510
511
512
513
514
515
516
def costs(self) -> dict:
    """
    Returns:
        Table of job costs
    """
    response = self.session.get(urljoin(self.url, '/costs'))
    _raise_for_hyp3_status(response)
    return response.json()

find_jobs(start=None, end=None, status_code=None, name=None, job_type=None, user_id=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime]

only jobs submitted after given time

None
end Optional[datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None
user_id Optional[str]

only jobs submitted by this user (defaults to the current user)

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def find_jobs(self,
              start: Optional[datetime] = None,
              end: Optional[datetime] = None,
              status_code: Optional[str] = None,
              name: Optional[str] = None,
              job_type: Optional[str] = None,
              user_id: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type
        user_id: only jobs submitted by this user (defaults to the current user)

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)

get_job_by_id(job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in hyp3_sdk/hyp3.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())

my_info()

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
482
483
484
485
486
487
488
489
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()

prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in hyp3_sdk/hyp3.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_insar_isce_burst_job(granule1, granule2, name=None, apply_water_mask=False, looks='20x4') classmethod

Prepare an InSAR ISCE burst job.

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
looks Literal['20x4', '10x2', '5x1']

Number of looks to take in range and azimuth

'20x4'

Returns:

Type Description
dict

A dictionary containing the prepared InSAR ISCE burst job

Source code in hyp3_sdk/hyp3.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@classmethod
def prepare_insar_isce_burst_job(cls,
                                 granule1: str,
                                 granule2: str,
                                 name: Optional[str] = None,
                                 apply_water_mask: bool = False,
                                 looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> dict:
    """Prepare an InSAR ISCE burst job.

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        looks: Number of looks to take in range and azimuth

    Returns:
        A dictionary containing the prepared InSAR ISCE burst job
    """
    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_ISCE_BURST',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False, phase_filter_parameter=0.6) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False
phase_filter_parameter float

Adaptive phase filter parameter. Useful values fall in the range 0.2 to 1. Larger values result in stronger filtering. If zero, adaptive phase filter will be skipped.

0.6

Returns:

Type Description
dict

A dictionary containing the prepared InSAR job

Source code in hyp3_sdk/hyp3.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False,
                      phase_filter_parameter: float = 0.6) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        phase_filter_parameter: Adaptive phase filter parameter.
            Useful values fall in the range 0.2 to 1.
            Larger values result in stronger filtering.
            If zero, adaptive phase filter will be skipped.

    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict

prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus']

Name of the DEM to use for processing. copernicus is the only option, and it will use

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in hyp3_sdk/hyp3.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[10, 20, 30] = 30,
                    scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
        the Copernicus GLO-30 Public DEM.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict

refresh(job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
160
161
162
163
164
165
166
167
168
169
170
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

submit_autorift_job(granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
205
206
207
208
209
210
211
212
213
214
215
216
217
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_insar_isce_burst_job(granule1, granule2, name=None, apply_water_mask=False, looks='20x4')

Submit an InSAR ISCE burst job.

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
looks Literal['20x4', '10x2', '5x1']

Number of looks to take in range and azimuth

'20x4'

Returns:

Type Description
Batch

A Batch object containing the InSAR ISCE burst job

Source code in hyp3_sdk/hyp3.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def submit_insar_isce_burst_job(self,
                                granule1: str,
                                granule2: str,
                                name: Optional[str] = None,
                                apply_water_mask: bool = False,
                                looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> Batch:
    """Submit an InSAR ISCE burst job.

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        looks: Number of looks to take in range and azimuth

    Returns:
        A Batch object containing the InSAR ISCE burst job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_isce_burst_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False, phase_filter_parameter=0.6)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False
phase_filter_parameter float

Adaptive phase filter parameter. Useful values fall in the range 0.2 to 1. Larger values result in stronger filtering. If zero, adaptive phase filter will be skipped.

0.6

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False,
                     phase_filter_parameter: float = 0.6) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        phase_filter_parameter: Adaptive phase filter parameter.
            Useful values fall in the range 0.2 to 1.
            Larger values result in stronger filtering.
            If zero, adaptive phase filter will be skipped.

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

submit_prepared_jobs(prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in hyp3_sdk/hyp3.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch

submit_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus']

Name of the DEM to use for processing. copernicus is the only option, and it will use

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[10, 20, 30] = 30,
                   scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
        the Copernicus GLO-30 Public DEM.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)

watch(job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

Job

Source code in hyp3_sdk/jobs.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class Job:
    _attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}

    def __init__(
            self,
            job_type: str,
            job_id: str,
            request_time: datetime,
            status_code: str,
            user_id: str,
            name: Optional[str] = None,
            job_parameters: Optional[dict] = None,
            files: Optional[List] = None,
            logs: Optional[List] = None,
            browse_images: Optional[List] = None,
            thumbnail_images: Optional[List] = None,
            expiration_time: Optional[datetime] = None,
            processing_times: Optional[List[float]] = None,
            credit_cost: Optional[float] = None,
    ):
        self.job_id = job_id
        self.job_type = job_type
        self.request_time = request_time
        self.status_code = status_code
        self.user_id = user_id
        self.name = name
        self.job_parameters = job_parameters
        self.files = files
        self.logs = logs
        self.browse_images = browse_images
        self.thumbnail_images = thumbnail_images
        self.expiration_time = expiration_time
        self.processing_times = processing_times
        self.credit_cost = credit_cost

    def __repr__(self):
        return f'Job.from_dict({self.to_dict()})'

    def __str__(self):
        return f'HyP3 {self.job_type} job {self.job_id}'

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    @staticmethod
    def from_dict(input_dict: dict):
        expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
        return Job(
            job_type=input_dict['job_type'],
            job_id=input_dict['job_id'],
            request_time=parse_date(input_dict['request_time']),
            status_code=input_dict['status_code'],
            user_id=input_dict['user_id'],
            name=input_dict.get('name'),
            job_parameters=input_dict.get('job_parameters'),
            files=input_dict.get('files'),
            logs=input_dict.get('logs'),
            browse_images=input_dict.get('browse_images'),
            thumbnail_images=input_dict.get('thumbnail_images'),
            expiration_time=expiration_time,
            processing_times=input_dict.get('processing_times'),
            credit_cost=input_dict.get('credit_cost'),
        )

    def to_dict(self, for_resubmit: bool = False):
        job_dict = {}
        if for_resubmit:
            keys_to_process = Job._attributes_for_resubmit
        else:
            keys_to_process = vars(self).keys()

        for key in keys_to_process:
            value = self.__getattribute__(key)
            if value is not None:
                if isinstance(value, datetime):
                    job_dict[key] = value.isoformat(timespec='seconds')
                else:
                    job_dict[key] = value

        return job_dict

    def succeeded(self) -> bool:
        return self.status_code == 'SUCCEEDED'

    def failed(self) -> bool:
        return self.status_code == 'FAILED'

    def complete(self) -> bool:
        return self.succeeded() or self.failed()

    def pending(self) -> bool:
        return self.status_code == 'PENDING'

    def running(self) -> bool:
        return self.status_code == 'RUNNING'

    def expired(self) -> bool:
        return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        location = Path(location)

        if not self.succeeded():
            raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
        if self.expired():
            raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                               f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

        if create:
            location.mkdir(parents=True, exist_ok=True)
        elif not location.is_dir():
            raise NotADirectoryError(str(location))

        downloaded_files = []
        for file in self.files:
            download_url = file['url']
            filename = location / file['filename']
            try:
                downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
            except HTTPError:
                raise HyP3SDKError(f'Unable to download file: {download_url}')
        return downloaded_files

download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

exceptions

Errors and exceptions to raise when the SDK runs into problems

ASFSearchError

Bases: HyP3SDKError

Raise for errors when using the ASF Search module

Source code in hyp3_sdk/exceptions.py
15
16
class ASFSearchError(HyP3SDKError):
    """Raise for errors when using the ASF Search module"""

AuthenticationError

Bases: HyP3SDKError

Raise when authentication does not succeed

Source code in hyp3_sdk/exceptions.py
23
24
class AuthenticationError(HyP3SDKError):
    """Raise when authentication does not succeed"""

HyP3Error

Bases: HyP3SDKError

Raise for errors when using the HyP3 module

Source code in hyp3_sdk/exceptions.py
11
12
class HyP3Error(HyP3SDKError):
    """Raise for errors when using the HyP3 module"""

HyP3SDKError

Bases: Exception

Base Exception for the HyP3 SDK

Source code in hyp3_sdk/exceptions.py
7
8
class HyP3SDKError(Exception):
    """Base Exception for the HyP3 SDK"""

ServerError

Bases: HyP3SDKError

Raise when the HyP3 SDK encounters a server error

Source code in hyp3_sdk/exceptions.py
19
20
class ServerError(HyP3SDKError):
    """Raise when the HyP3 SDK encounters a server error"""

hyp3

HyP3

A python wrapper around the HyP3 API.

Warning: All jobs submitted to HyP3 are publicly visible. For more information, see https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs

Source code in hyp3_sdk/hyp3.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
class HyP3:
    """A python wrapper around the HyP3 API.

    Warning: All jobs submitted to HyP3 are publicly visible. For more information, see
    https://hyp3-docs.asf.alaska.edu/#public-visibility-of-jobs
    """

    def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
                 prompt: bool = False):
        """If username and password are not provided, attempts to use credentials from a `.netrc` file.

        Args:
            api_url: Address of the HyP3 API
            username: Username for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            password: Password for authenticating to `urs.earthdata.nasa.gov`.
                Both username and password must be provided if either is provided.
            prompt: Prompt for username and/or password interactively when they
                are not provided as keyword parameters
        """
        self.url = api_url

        if username is None and prompt:
            username = input('NASA Earthdata Login username: ')
        if password is None and prompt:
            password = getpass('NASA Earthdata Login password: ')

        self.session = hyp3_sdk.util.get_authenticated_session(username, password)
        self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})

    def find_jobs(self,
                  start: Optional[datetime] = None,
                  end: Optional[datetime] = None,
                  status_code: Optional[str] = None,
                  name: Optional[str] = None,
                  job_type: Optional[str] = None,
                  user_id: Optional[str] = None) -> Batch:
        """Gets a Batch of jobs from HyP3 matching the provided search criteria

        Args:
            start: only jobs submitted after given time
            end: only jobs submitted before given time
            status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
            name: only jobs with this name
            job_type: only jobs with this job_type
            user_id: only jobs submitted by this user (defaults to the current user)

        Returns:
            A Batch object containing the found jobs
        """
        params = {}
        for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
            param_value = locals().get(param_name)
            if param_value is not None:
                if isinstance(param_value, datetime):
                    if param_value.tzinfo is None:
                        param_value = param_value.replace(tzinfo=timezone.utc)
                    param_value = param_value.isoformat(timespec='seconds')

                params[param_name] = param_value

        response = self.session.get(urljoin(self.url, '/jobs'), params=params)
        _raise_for_hyp3_status(response)
        jobs = [Job.from_dict(job) for job in response.json()['jobs']]

        while 'next' in response.json():
            next_url = response.json()['next']
            response = self.session.get(next_url)
            _raise_for_hyp3_status(response)
            jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

        return Batch(jobs)

    def get_job_by_id(self, job_id: str) -> Job:
        """Get job by job ID

        Args:
            job_id: A job ID

        Returns:
            A Job object
        """
        response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
        _raise_for_hyp3_status(response)

        return Job.from_dict(response.json())

    @singledispatchmethod
    def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
              interval: Union[int, float] = 60) -> Union[Batch, Job]:
        """Watch jobs until they complete

        Args:
            job_or_batch: A Batch or Job object of jobs to watch
            timeout: How long to wait until exiting in seconds
            interval: How often to check for updates in seconds

        Returns:
            A Batch or Job object with refreshed watched jobs
        """
        raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

    @watch.register
    def _watch_batch(self, batch: Batch, timeout: int = 10800, interval: Union[int, float] = 60) -> Batch:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=len(batch), bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                batch = self.refresh(batch)

                counts = batch._count_statuses()
                complete = counts['SUCCEEDED'] + counts['FAILED']

                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                # to control n/total manually; update is n += value
                progress_bar.n = complete
                progress_bar.update(0)

                if batch.complete():
                    return batch
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {batch}')

    @watch.register
    def _watch_job(self, job: Job, timeout: int = 10800, interval: Union[int, float] = 60) -> Job:
        tqdm = hyp3_sdk.util.get_tqdm_progress_bar()
        iterations_until_timeout = math.ceil(timeout / interval)
        bar_format = '{n_fmt}/{total_fmt} [{postfix[0]}]'
        with tqdm(total=1, bar_format=bar_format, postfix=[f'timeout in {timeout} s']) as progress_bar:
            for ii in range(iterations_until_timeout):
                job = self.refresh(job)
                progress_bar.postfix = [f'timeout in {timeout - ii * interval}s']
                progress_bar.update(int(job.complete()))

                if job.complete():
                    return job
                time.sleep(interval)
        raise HyP3Error(f'Timeout occurred while waiting for {job}')

    @singledispatchmethod
    def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
        """Refresh each jobs' information

        Args:
            job_or_batch: A Batch of Job object to refresh

        Returns:
            A Batch or Job object with refreshed information
        """
        raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')

    @refresh.register
    def _refresh_batch(self, batch: Batch):
        jobs = []
        for job in batch.jobs:
            jobs.append(self.refresh(job))
        return Batch(jobs)

    @refresh.register
    def _refresh_job(self, job: Job):
        return self.get_job_by_id(job.job_id)

    def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
        """Submit a prepared job dictionary, or list of prepared job dictionaries

        Args:
            prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

        Returns:
            A Batch object containing the submitted job(s)
        """
        if isinstance(prepared_jobs, dict):
            payload = {'jobs': [prepared_jobs]}
        else:
            payload = {'jobs': prepared_jobs}

        response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
        _raise_for_hyp3_status(response)

        batch = Batch()
        for job in response.json()['jobs']:
            batch += Job.from_dict(job)
        return batch

    def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A Batch object containing the autoRIFT job
        """
        job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
        """Submit an autoRIFT job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job

        Returns:
            A dictionary containing the prepared autoRIFT job
        """
        job_dict = {
            'job_parameters': {'granules': [granule1, granule2]},
            'job_type': 'AUTORIFT',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_rtc_job(self,
                       granule: str,
                       name: Optional[str] = None,
                       dem_matching: bool = False,
                       include_dem: bool = False,
                       include_inc_map: bool = False,
                       include_rgb: bool = False,
                       include_scattering_area: bool = False,
                       radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                       resolution: Literal[10, 20, 30] = 30,
                       scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                       speckle_filter: bool = False,
                       dem_name: Literal['copernicus'] = 'copernicus') -> Batch:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
            the Copernicus GLO-30 Public DEM.

        Returns:
            A Batch object containing the RTC job
        """
        arguments = locals()
        arguments.pop('self')
        job_dict = self.prepare_rtc_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_rtc_job(cls,
                        granule: str,
                        name: Optional[str] = None,
                        dem_matching: bool = False,
                        include_dem: bool = False,
                        include_inc_map: bool = False,
                        include_rgb: bool = False,
                        include_scattering_area: bool = False,
                        radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                        resolution: Literal[10, 20, 30] = 30,
                        scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                        speckle_filter: bool = False,
                        dem_name: Literal['copernicus'] = 'copernicus') -> dict:
        """Submit an RTC job

        Args:
            granule: The granule (scene) to use
            name: A name for the job
            dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
            include_dem: Include the DEM file in the product package
            include_inc_map: Include the local incidence angle map in the product package
            include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
                (ignored for single-pol granules)
            include_scattering_area: Include the scattering area in the product package
            radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
                projected into the look direction (gamma0)
            resolution: Desired output pixel spacing in meters
            scale: Scale of output image; power, decibel or amplitude
            speckle_filter: Apply an Enhanced Lee speckle filter
            dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
            the Copernicus GLO-30 Public DEM.

        Returns:
            A dictionary containing the prepared RTC job
        """
        job_parameters = locals().copy()
        for key in ['granule', 'name', 'cls']:
            job_parameters.pop(key, None)

        job_dict = {
            'job_parameters': {'granules': [granule], **job_parameters},
            'job_type': 'RTC_GAMMA',
        }

        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_job(self,
                         granule1: str,
                         granule2: str,
                         name: Optional[str] = None,
                         include_look_vectors: bool = False,
                         include_los_displacement: bool = False,
                         include_inc_map: bool = False,
                         looks: Literal['20x4', '10x2'] = '20x4',
                         include_dem: bool = False,
                         include_wrapped_phase: bool = False,
                         apply_water_mask: bool = False,
                         include_displacement_maps: bool = False,
                         phase_filter_parameter: float = 0.6) -> Batch:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
            phase_filter_parameter: Adaptive phase filter parameter.
                Useful values fall in the range 0.2 to 1.
                Larger values result in stronger filtering.
                If zero, adaptive phase filter will be skipped.

        Returns:
            A Batch object containing the InSAR job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_job(cls,
                          granule1: str,
                          granule2: str,
                          name: Optional[str] = None,
                          include_look_vectors: bool = False,
                          include_los_displacement: bool = False,
                          include_inc_map: bool = False,
                          looks: Literal['20x4', '10x2'] = '20x4',
                          include_dem: bool = False,
                          include_wrapped_phase: bool = False,
                          apply_water_mask: bool = False,
                          include_displacement_maps: bool = False,
                          phase_filter_parameter: float = 0.6) -> dict:
        """Submit an InSAR job

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            include_look_vectors: Include the look vector theta and phi files in the product package
            include_los_displacement: Include a GeoTIFF in the product package containing displacement values
                along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
                `include_displacement_maps`, and will be removed in a future release.
            include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
            looks: Number of looks to take in range and azimuth
            include_dem: Include the digital elevation model GeoTIFF in the product package
            include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
            phase_filter_parameter: Adaptive phase filter parameter.
                Useful values fall in the range 0.2 to 1.
                Larger values result in stronger filtering.
                If zero, adaptive phase filter will be skipped.

        Returns:
            A dictionary containing the prepared InSAR job
        """
        if include_los_displacement:
            warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                          'include_displacement_maps, and will be removed in a future release.', FutureWarning)

        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_GAMMA',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def submit_insar_isce_burst_job(self,
                                    granule1: str,
                                    granule2: str,
                                    name: Optional[str] = None,
                                    apply_water_mask: bool = False,
                                    looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> Batch:
        """Submit an InSAR ISCE burst job.

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            looks: Number of looks to take in range and azimuth

        Returns:
            A Batch object containing the InSAR ISCE burst job
        """
        arguments = locals().copy()
        arguments.pop('self')
        job_dict = self.prepare_insar_isce_burst_job(**arguments)
        return self.submit_prepared_jobs(prepared_jobs=job_dict)

    @classmethod
    def prepare_insar_isce_burst_job(cls,
                                     granule1: str,
                                     granule2: str,
                                     name: Optional[str] = None,
                                     apply_water_mask: bool = False,
                                     looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> dict:
        """Prepare an InSAR ISCE burst job.

        Args:
            granule1: The first granule (scene) to use
            granule2: The second granule (scene) to use
            name: A name for the job
            apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
                as invalid for phase unwrapping
            looks: Number of looks to take in range and azimuth

        Returns:
            A dictionary containing the prepared InSAR ISCE burst job
        """
        job_parameters = locals().copy()
        for key in ['cls', 'granule1', 'granule2', 'name']:
            job_parameters.pop(key)

        job_dict = {
            'job_parameters': {'granules': [granule1, granule2], **job_parameters},
            'job_type': 'INSAR_ISCE_BURST',
        }
        if name is not None:
            job_dict['name'] = name
        return job_dict

    def my_info(self) -> dict:
        """
        Returns:
            Your user information
        """
        response = self.session.get(urljoin(self.url, '/user'))
        _raise_for_hyp3_status(response)
        return response.json()

    def check_credits(self) -> Union[float, int, None]:
        """
        Returns:
            Your remaining processing credits, or None if you have no processing limit
        """
        info = self.my_info()
        return info['remaining_credits']

    def check_quota(self) -> Union[float, int, None]:
        """Deprecated method for checking your remaining processing credits; replaced by `HyP3.check_credits`

        Returns:
            Your remaining processing credits, or None if you have no processing limit
        """
        warn('This method is deprecated and will be removed in a future release.\n'
             'Please use `HyP3.check_credits` instead.', DeprecationWarning, stacklevel=2)
        return self.check_credits()

    def costs(self) -> dict:
        """
        Returns:
            Table of job costs
        """
        response = self.session.get(urljoin(self.url, '/costs'))
        _raise_for_hyp3_status(response)
        return response.json()
__init__(api_url=PROD_API, username=None, password=None, prompt=False)

If username and password are not provided, attempts to use credentials from a .netrc file.

Parameters:

Name Type Description Default
api_url str

Address of the HyP3 API

PROD_API
username Optional[str]

Username for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
password Optional[str]

Password for authenticating to urs.earthdata.nasa.gov. Both username and password must be provided if either is provided.

None
prompt bool

Prompt for username and/or password interactively when they are not provided as keyword parameters

False
Source code in hyp3_sdk/hyp3.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(self, api_url: str = PROD_API, username: Optional[str] = None, password: Optional[str] = None,
             prompt: bool = False):
    """If username and password are not provided, attempts to use credentials from a `.netrc` file.

    Args:
        api_url: Address of the HyP3 API
        username: Username for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        password: Password for authenticating to `urs.earthdata.nasa.gov`.
            Both username and password must be provided if either is provided.
        prompt: Prompt for username and/or password interactively when they
            are not provided as keyword parameters
    """
    self.url = api_url

    if username is None and prompt:
        username = input('NASA Earthdata Login username: ')
    if password is None and prompt:
        password = getpass('NASA Earthdata Login password: ')

    self.session = hyp3_sdk.util.get_authenticated_session(username, password)
    self.session.headers.update({'User-Agent': f'{hyp3_sdk.__name__}/{hyp3_sdk.__version__}'})
check_credits()

Returns:

Type Description
Union[float, int, None]

Your remaining processing credits, or None if you have no processing limit

Source code in hyp3_sdk/hyp3.py
491
492
493
494
495
496
497
def check_credits(self) -> Union[float, int, None]:
    """
    Returns:
        Your remaining processing credits, or None if you have no processing limit
    """
    info = self.my_info()
    return info['remaining_credits']
check_quota()

Deprecated method for checking your remaining processing credits; replaced by HyP3.check_credits

Returns:

Type Description
Union[float, int, None]

Your remaining processing credits, or None if you have no processing limit

Source code in hyp3_sdk/hyp3.py
499
500
501
502
503
504
505
506
507
def check_quota(self) -> Union[float, int, None]:
    """Deprecated method for checking your remaining processing credits; replaced by `HyP3.check_credits`

    Returns:
        Your remaining processing credits, or None if you have no processing limit
    """
    warn('This method is deprecated and will be removed in a future release.\n'
         'Please use `HyP3.check_credits` instead.', DeprecationWarning, stacklevel=2)
    return self.check_credits()
costs()

Returns:

Type Description
dict

Table of job costs

Source code in hyp3_sdk/hyp3.py
509
510
511
512
513
514
515
516
def costs(self) -> dict:
    """
    Returns:
        Table of job costs
    """
    response = self.session.get(urljoin(self.url, '/costs'))
    _raise_for_hyp3_status(response)
    return response.json()
find_jobs(start=None, end=None, status_code=None, name=None, job_type=None, user_id=None)

Gets a Batch of jobs from HyP3 matching the provided search criteria

Parameters:

Name Type Description Default
start Optional[datetime]

only jobs submitted after given time

None
end Optional[datetime]

only jobs submitted before given time

None
status_code Optional[str]

only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)

None
name Optional[str]

only jobs with this name

None
job_type Optional[str]

only jobs with this job_type

None
user_id Optional[str]

only jobs submitted by this user (defaults to the current user)

None

Returns:

Type Description
Batch

A Batch object containing the found jobs

Source code in hyp3_sdk/hyp3.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def find_jobs(self,
              start: Optional[datetime] = None,
              end: Optional[datetime] = None,
              status_code: Optional[str] = None,
              name: Optional[str] = None,
              job_type: Optional[str] = None,
              user_id: Optional[str] = None) -> Batch:
    """Gets a Batch of jobs from HyP3 matching the provided search criteria

    Args:
        start: only jobs submitted after given time
        end: only jobs submitted before given time
        status_code: only jobs matching this status (SUCCEEDED, FAILED, RUNNING, PENDING)
        name: only jobs with this name
        job_type: only jobs with this job_type
        user_id: only jobs submitted by this user (defaults to the current user)

    Returns:
        A Batch object containing the found jobs
    """
    params = {}
    for param_name in ('start', 'end', 'status_code', 'name', 'job_type', 'user_id'):
        param_value = locals().get(param_name)
        if param_value is not None:
            if isinstance(param_value, datetime):
                if param_value.tzinfo is None:
                    param_value = param_value.replace(tzinfo=timezone.utc)
                param_value = param_value.isoformat(timespec='seconds')

            params[param_name] = param_value

    response = self.session.get(urljoin(self.url, '/jobs'), params=params)
    _raise_for_hyp3_status(response)
    jobs = [Job.from_dict(job) for job in response.json()['jobs']]

    while 'next' in response.json():
        next_url = response.json()['next']
        response = self.session.get(next_url)
        _raise_for_hyp3_status(response)
        jobs.extend([Job.from_dict(job) for job in response.json()['jobs']])

    return Batch(jobs)
get_job_by_id(job_id)

Get job by job ID

Parameters:

Name Type Description Default
job_id str

A job ID

required

Returns:

Type Description
Job

A Job object

Source code in hyp3_sdk/hyp3.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_job_by_id(self, job_id: str) -> Job:
    """Get job by job ID

    Args:
        job_id: A job ID

    Returns:
        A Job object
    """
    response = self.session.get(urljoin(self.url, f'/jobs/{job_id}'))
    _raise_for_hyp3_status(response)

    return Job.from_dict(response.json())
my_info()

Returns:

Type Description
dict

Your user information

Source code in hyp3_sdk/hyp3.py
482
483
484
485
486
487
488
489
def my_info(self) -> dict:
    """
    Returns:
        Your user information
    """
    response = self.session.get(urljoin(self.url, '/user'))
    _raise_for_hyp3_status(response)
    return response.json()
prepare_autorift_job(granule1, granule2, name=None) classmethod

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
dict

A dictionary containing the prepared autoRIFT job

Source code in hyp3_sdk/hyp3.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@classmethod
def prepare_autorift_job(cls, granule1: str, granule2: str, name: Optional[str] = None) -> dict:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A dictionary containing the prepared autoRIFT job
    """
    job_dict = {
        'job_parameters': {'granules': [granule1, granule2]},
        'job_type': 'AUTORIFT',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict
prepare_insar_isce_burst_job(granule1, granule2, name=None, apply_water_mask=False, looks='20x4') classmethod

Prepare an InSAR ISCE burst job.

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
looks Literal['20x4', '10x2', '5x1']

Number of looks to take in range and azimuth

'20x4'

Returns:

Type Description
dict

A dictionary containing the prepared InSAR ISCE burst job

Source code in hyp3_sdk/hyp3.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@classmethod
def prepare_insar_isce_burst_job(cls,
                                 granule1: str,
                                 granule2: str,
                                 name: Optional[str] = None,
                                 apply_water_mask: bool = False,
                                 looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> dict:
    """Prepare an InSAR ISCE burst job.

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        looks: Number of looks to take in range and azimuth

    Returns:
        A dictionary containing the prepared InSAR ISCE burst job
    """
    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_ISCE_BURST',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict
prepare_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False, phase_filter_parameter=0.6) classmethod

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False
phase_filter_parameter float

Adaptive phase filter parameter. Useful values fall in the range 0.2 to 1. Larger values result in stronger filtering. If zero, adaptive phase filter will be skipped.

0.6

Returns:

Type Description
dict

A dictionary containing the prepared InSAR job

Source code in hyp3_sdk/hyp3.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
@classmethod
def prepare_insar_job(cls,
                      granule1: str,
                      granule2: str,
                      name: Optional[str] = None,
                      include_look_vectors: bool = False,
                      include_los_displacement: bool = False,
                      include_inc_map: bool = False,
                      looks: Literal['20x4', '10x2'] = '20x4',
                      include_dem: bool = False,
                      include_wrapped_phase: bool = False,
                      apply_water_mask: bool = False,
                      include_displacement_maps: bool = False,
                      phase_filter_parameter: float = 0.6) -> dict:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        phase_filter_parameter: Adaptive phase filter parameter.
            Useful values fall in the range 0.2 to 1.
            Larger values result in stronger filtering.
            If zero, adaptive phase filter will be skipped.

    Returns:
        A dictionary containing the prepared InSAR job
    """
    if include_los_displacement:
        warnings.warn('The include_los_displacement parameter has been deprecated in favor of '
                      'include_displacement_maps, and will be removed in a future release.', FutureWarning)

    job_parameters = locals().copy()
    for key in ['cls', 'granule1', 'granule2', 'name']:
        job_parameters.pop(key)

    job_dict = {
        'job_parameters': {'granules': [granule1, granule2], **job_parameters},
        'job_type': 'INSAR_GAMMA',
    }
    if name is not None:
        job_dict['name'] = name
    return job_dict
prepare_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus') classmethod

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus']

Name of the DEM to use for processing. copernicus is the only option, and it will use

'copernicus'

Returns:

Type Description
dict

A dictionary containing the prepared RTC job

Source code in hyp3_sdk/hyp3.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
@classmethod
def prepare_rtc_job(cls,
                    granule: str,
                    name: Optional[str] = None,
                    dem_matching: bool = False,
                    include_dem: bool = False,
                    include_inc_map: bool = False,
                    include_rgb: bool = False,
                    include_scattering_area: bool = False,
                    radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                    resolution: Literal[10, 20, 30] = 30,
                    scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                    speckle_filter: bool = False,
                    dem_name: Literal['copernicus'] = 'copernicus') -> dict:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
        the Copernicus GLO-30 Public DEM.

    Returns:
        A dictionary containing the prepared RTC job
    """
    job_parameters = locals().copy()
    for key in ['granule', 'name', 'cls']:
        job_parameters.pop(key, None)

    job_dict = {
        'job_parameters': {'granules': [granule], **job_parameters},
        'job_type': 'RTC_GAMMA',
    }

    if name is not None:
        job_dict['name'] = name
    return job_dict
refresh(job_or_batch)

Refresh each jobs' information

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch of Job object to refresh

required

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed information

Source code in hyp3_sdk/hyp3.py
160
161
162
163
164
165
166
167
168
169
170
@singledispatchmethod
def refresh(self, job_or_batch: Union[Batch, Job]) -> Union[Batch, Job]:
    """Refresh each jobs' information

    Args:
        job_or_batch: A Batch of Job object to refresh

    Returns:
        A Batch or Job object with refreshed information
    """
    raise NotImplementedError(f'Cannot refresh {type(job_or_batch)} type object')
submit_autorift_job(granule1, granule2, name=None)

Submit an autoRIFT job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None

Returns:

Type Description
Batch

A Batch object containing the autoRIFT job

Source code in hyp3_sdk/hyp3.py
205
206
207
208
209
210
211
212
213
214
215
216
217
def submit_autorift_job(self, granule1: str, granule2: str, name: Optional[str] = None) -> Batch:
    """Submit an autoRIFT job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job

    Returns:
        A Batch object containing the autoRIFT job
    """
    job_dict = self.prepare_autorift_job(granule1, granule2, name=name)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_insar_isce_burst_job(granule1, granule2, name=None, apply_water_mask=False, looks='20x4')

Submit an InSAR ISCE burst job.

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
looks Literal['20x4', '10x2', '5x1']

Number of looks to take in range and azimuth

'20x4'

Returns:

Type Description
Batch

A Batch object containing the InSAR ISCE burst job

Source code in hyp3_sdk/hyp3.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def submit_insar_isce_burst_job(self,
                                granule1: str,
                                granule2: str,
                                name: Optional[str] = None,
                                apply_water_mask: bool = False,
                                looks: Literal['20x4', '10x2', '5x1'] = '20x4') -> Batch:
    """Submit an InSAR ISCE burst job.

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        looks: Number of looks to take in range and azimuth

    Returns:
        A Batch object containing the InSAR ISCE burst job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_isce_burst_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_insar_job(granule1, granule2, name=None, include_look_vectors=False, include_los_displacement=False, include_inc_map=False, looks='20x4', include_dem=False, include_wrapped_phase=False, apply_water_mask=False, include_displacement_maps=False, phase_filter_parameter=0.6)

Submit an InSAR job

Parameters:

Name Type Description Default
granule1 str

The first granule (scene) to use

required
granule2 str

The second granule (scene) to use

required
name Optional[str]

A name for the job

None
include_look_vectors bool

Include the look vector theta and phi files in the product package

False
include_los_displacement bool

Include a GeoTIFF in the product package containing displacement values along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of include_displacement_maps, and will be removed in a future release.

False
include_inc_map bool

Include the local and ellipsoidal incidence angle maps in the product package

False
looks Literal['20x4', '10x2']

Number of looks to take in range and azimuth

'20x4'
include_dem bool

Include the digital elevation model GeoTIFF in the product package

False
include_wrapped_phase bool

Include the wrapped phase GeoTIFF in the product package

False
apply_water_mask bool

Sets pixels over coastal waters and large inland waterbodies as invalid for phase unwrapping

False
include_displacement_maps bool

Include displacement maps (line-of-sight and vertical) in the product package

False
phase_filter_parameter float

Adaptive phase filter parameter. Useful values fall in the range 0.2 to 1. Larger values result in stronger filtering. If zero, adaptive phase filter will be skipped.

0.6

Returns:

Type Description
Batch

A Batch object containing the InSAR job

Source code in hyp3_sdk/hyp3.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def submit_insar_job(self,
                     granule1: str,
                     granule2: str,
                     name: Optional[str] = None,
                     include_look_vectors: bool = False,
                     include_los_displacement: bool = False,
                     include_inc_map: bool = False,
                     looks: Literal['20x4', '10x2'] = '20x4',
                     include_dem: bool = False,
                     include_wrapped_phase: bool = False,
                     apply_water_mask: bool = False,
                     include_displacement_maps: bool = False,
                     phase_filter_parameter: float = 0.6) -> Batch:
    """Submit an InSAR job

    Args:
        granule1: The first granule (scene) to use
        granule2: The second granule (scene) to use
        name: A name for the job
        include_look_vectors: Include the look vector theta and phi files in the product package
        include_los_displacement: Include a GeoTIFF in the product package containing displacement values
            along the Line-Of-Sight (LOS). This parameter has been deprecated in favor of
            `include_displacement_maps`, and will be removed in a future release.
        include_inc_map: Include the local and ellipsoidal incidence angle maps in the product package
        looks: Number of looks to take in range and azimuth
        include_dem: Include the digital elevation model GeoTIFF in the product package
        include_wrapped_phase: Include the wrapped phase GeoTIFF in the product package
        apply_water_mask: Sets pixels over coastal waters and large inland waterbodies
            as invalid for phase unwrapping
        include_displacement_maps: Include displacement maps (line-of-sight and vertical) in the product package
        phase_filter_parameter: Adaptive phase filter parameter.
            Useful values fall in the range 0.2 to 1.
            Larger values result in stronger filtering.
            If zero, adaptive phase filter will be skipped.

    Returns:
        A Batch object containing the InSAR job
    """
    arguments = locals().copy()
    arguments.pop('self')
    job_dict = self.prepare_insar_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
submit_prepared_jobs(prepared_jobs)

Submit a prepared job dictionary, or list of prepared job dictionaries

Parameters:

Name Type Description Default
prepared_jobs Union[dict, List[dict]]

A prepared job dictionary, or list of prepared job dictionaries

required

Returns:

Type Description
Batch

A Batch object containing the submitted job(s)

Source code in hyp3_sdk/hyp3.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def submit_prepared_jobs(self, prepared_jobs: Union[dict, List[dict]]) -> Batch:
    """Submit a prepared job dictionary, or list of prepared job dictionaries

    Args:
        prepared_jobs: A prepared job dictionary, or list of prepared job dictionaries

    Returns:
        A Batch object containing the submitted job(s)
    """
    if isinstance(prepared_jobs, dict):
        payload = {'jobs': [prepared_jobs]}
    else:
        payload = {'jobs': prepared_jobs}

    response = self.session.post(urljoin(self.url, '/jobs'), json=payload)
    _raise_for_hyp3_status(response)

    batch = Batch()
    for job in response.json()['jobs']:
        batch += Job.from_dict(job)
    return batch
submit_rtc_job(granule, name=None, dem_matching=False, include_dem=False, include_inc_map=False, include_rgb=False, include_scattering_area=False, radiometry='gamma0', resolution=30, scale='power', speckle_filter=False, dem_name='copernicus')

Submit an RTC job

Parameters:

Name Type Description Default
granule str

The granule (scene) to use

required
name Optional[str]

A name for the job

None
dem_matching bool

Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files

False
include_dem bool

Include the DEM file in the product package

False
include_inc_map bool

Include the local incidence angle map in the product package

False
include_rgb bool

Include a false-color RGB decomposition in the product package for dual-pol granules (ignored for single-pol granules)

False
include_scattering_area bool

Include the scattering area in the product package

False
radiometry Literal['sigma0', 'gamma0']

Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area projected into the look direction (gamma0)

'gamma0'
resolution Literal[10, 20, 30]

Desired output pixel spacing in meters

30
scale Literal['amplitude', 'decibel', 'power']

Scale of output image; power, decibel or amplitude

'power'
speckle_filter bool

Apply an Enhanced Lee speckle filter

False
dem_name Literal['copernicus']

Name of the DEM to use for processing. copernicus is the only option, and it will use

'copernicus'

Returns:

Type Description
Batch

A Batch object containing the RTC job

Source code in hyp3_sdk/hyp3.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def submit_rtc_job(self,
                   granule: str,
                   name: Optional[str] = None,
                   dem_matching: bool = False,
                   include_dem: bool = False,
                   include_inc_map: bool = False,
                   include_rgb: bool = False,
                   include_scattering_area: bool = False,
                   radiometry: Literal['sigma0', 'gamma0'] = 'gamma0',
                   resolution: Literal[10, 20, 30] = 30,
                   scale: Literal['amplitude', 'decibel', 'power'] = 'power',
                   speckle_filter: bool = False,
                   dem_name: Literal['copernicus'] = 'copernicus') -> Batch:
    """Submit an RTC job

    Args:
        granule: The granule (scene) to use
        name: A name for the job
        dem_matching: Coregisters SAR data to the DEM, rather than using dead reckoning based on orbit files
        include_dem: Include the DEM file in the product package
        include_inc_map: Include the local incidence angle map in the product package
        include_rgb: Include a false-color RGB decomposition in the product package for dual-pol granules
            (ignored for single-pol granules)
        include_scattering_area: Include the scattering area in the product package
        radiometry: Backscatter coefficient normalization, either by ground area (sigma0) or illuminated area
            projected into the look direction (gamma0)
        resolution: Desired output pixel spacing in meters
        scale: Scale of output image; power, decibel or amplitude
        speckle_filter: Apply an Enhanced Lee speckle filter
        dem_name: Name of the DEM to use for processing. `copernicus` is the only option, and it will use
        the Copernicus GLO-30 Public DEM.

    Returns:
        A Batch object containing the RTC job
    """
    arguments = locals()
    arguments.pop('self')
    job_dict = self.prepare_rtc_job(**arguments)
    return self.submit_prepared_jobs(prepared_jobs=job_dict)
watch(job_or_batch, timeout=10800, interval=60)

Watch jobs until they complete

Parameters:

Name Type Description Default
job_or_batch Union[Batch, Job]

A Batch or Job object of jobs to watch

required
timeout int

How long to wait until exiting in seconds

10800
interval Union[int, float]

How often to check for updates in seconds

60

Returns:

Type Description
Union[Batch, Job]

A Batch or Job object with refreshed watched jobs

Source code in hyp3_sdk/hyp3.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@singledispatchmethod
def watch(self, job_or_batch: Union[Batch, Job], timeout: int = 10800,
          interval: Union[int, float] = 60) -> Union[Batch, Job]:
    """Watch jobs until they complete

    Args:
        job_or_batch: A Batch or Job object of jobs to watch
        timeout: How long to wait until exiting in seconds
        interval: How often to check for updates in seconds

    Returns:
        A Batch or Job object with refreshed watched jobs
    """
    raise NotImplementedError(f'Cannot watch {type(job_or_batch)} type object')

jobs

Batch

Source code in hyp3_sdk/jobs.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class Batch:
    def __init__(self, jobs: Optional[List[Job]] = None):
        if jobs is None:
            jobs = []
        self.jobs = jobs

    def __add__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            return Batch(self.jobs + other.jobs)
        elif isinstance(other, Job):
            return Batch(self.jobs + [other])
        else:
            raise TypeError(f"unsupported operand type(s) for +: '{type(self)}' and '{type(other)}'")

    def __iadd__(self, other: Union[Job, 'Batch']):
        if isinstance(other, Batch):
            self.jobs += other.jobs
        elif isinstance(other, Job):
            self.jobs += [other]
        else:
            raise TypeError(f"unsupported operand type(s) for +=: '{type(self)}' and '{type(other)}'")
        return self

    def __iter__(self):
        return iter(self.jobs)

    def __len__(self):
        return len(self.jobs)

    def __contains__(self, job: Job):
        return job in self.jobs

    def __eq__(self, other: 'Batch'):
        return self.jobs == other.jobs

    def __delitem__(self, job: int):
        self.jobs.pop(job)
        return self

    def __getitem__(self, index: int):
        if isinstance(index, slice):
            return Batch(self.jobs[index])
        return self.jobs[index]

    def __setitem__(self, index: int, job: Job):
        self.jobs[index] = job
        return self

    def __repr__(self):
        reprs = ", ".join([job.__repr__() for job in self.jobs])
        return f'Batch([{reprs}])'

    def __str__(self):
        count = self._count_statuses()
        return f'{len(self)} HyP3 Jobs: ' \
               f'{count["SUCCEEDED"]} succeeded, ' \
               f'{count["FAILED"]} failed, ' \
               f'{count["RUNNING"]} running, ' \
               f'{count["PENDING"]} pending.'

    def _count_statuses(self):
        return Counter([job.status_code for job in self.jobs])

    def complete(self) -> bool:
        """
        Returns: True if all jobs are complete, otherwise returns False
        """
        for job in self.jobs:
            if not job.complete():
                return False
        return True

    def succeeded(self) -> bool:
        """
        Returns: True if all jobs have succeeded, otherwise returns False
        """
        for job in self.jobs:
            if not job.succeeded():
                return False
        return True

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        downloaded_files = []
        tqdm = get_tqdm_progress_bar()
        for job in tqdm(self.jobs):
            try:
                downloaded_files.extend(job.download_files(location, create))
            except HyP3SDKError as e:
                print(f'Warning: {e} Skipping download for {job}.')
        return downloaded_files

    def any_expired(self) -> bool:
        """Check succeeded jobs for expiration"""
        for job in self.jobs:
            try:
                if job.expired():
                    return True
            except HyP3SDKError:
                continue
        return False

    def filter_jobs(
        self,
        succeeded: bool = True,
        pending: bool = True,
        running: bool = True,
        failed: bool = False,
        include_expired: bool = True,
    ) -> 'Batch':
        """Filter jobs by status. By default, only succeeded, pending,
        and still running jobs will be in the returned batch.

        Args:
            succeeded: Include all succeeded jobs
            pending: Include all pending jobs
            running: Include all running jobs
            failed: Include all failed jobs
            include_expired: Include expired jobs in the result


        Returns:
             batch: A batch object containing jobs matching all the selected statuses
        """
        filtered_jobs = []

        for job in self.jobs:
            if job.succeeded() and succeeded:
                if include_expired or not job.expired():
                    filtered_jobs.append(job)

            elif job.running() and running:
                filtered_jobs.append(job)

            elif job.pending() and pending:
                filtered_jobs.append(job)

            elif job.failed() and failed:
                filtered_jobs.append(job)

        return Batch(filtered_jobs)

    def total_credit_cost(self):
        return sum(job.credit_cost for job in self.jobs if job.credit_cost is not None)
any_expired()

Check succeeded jobs for expiration

Source code in hyp3_sdk/jobs.py
245
246
247
248
249
250
251
252
253
def any_expired(self) -> bool:
    """Check succeeded jobs for expiration"""
    for job in self.jobs:
        try:
            if job.expired():
                return True
        except HyP3SDKError:
            continue
    return False
complete()

Returns: True if all jobs are complete, otherwise returns False

Source code in hyp3_sdk/jobs.py
210
211
212
213
214
215
216
217
def complete(self) -> bool:
    """
    Returns: True if all jobs are complete, otherwise returns False
    """
    for job in self.jobs:
        if not job.complete():
            return False
    return True
download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    downloaded_files = []
    tqdm = get_tqdm_progress_bar()
    for job in tqdm(self.jobs):
        try:
            downloaded_files.extend(job.download_files(location, create))
        except HyP3SDKError as e:
            print(f'Warning: {e} Skipping download for {job}.')
    return downloaded_files
filter_jobs(succeeded=True, pending=True, running=True, failed=False, include_expired=True)

Filter jobs by status. By default, only succeeded, pending, and still running jobs will be in the returned batch.

Parameters:

Name Type Description Default
succeeded bool

Include all succeeded jobs

True
pending bool

Include all pending jobs

True
running bool

Include all running jobs

True
failed bool

Include all failed jobs

False
include_expired bool

Include expired jobs in the result

True

Returns:

Name Type Description
batch Batch

A batch object containing jobs matching all the selected statuses

Source code in hyp3_sdk/jobs.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def filter_jobs(
    self,
    succeeded: bool = True,
    pending: bool = True,
    running: bool = True,
    failed: bool = False,
    include_expired: bool = True,
) -> 'Batch':
    """Filter jobs by status. By default, only succeeded, pending,
    and still running jobs will be in the returned batch.

    Args:
        succeeded: Include all succeeded jobs
        pending: Include all pending jobs
        running: Include all running jobs
        failed: Include all failed jobs
        include_expired: Include expired jobs in the result


    Returns:
         batch: A batch object containing jobs matching all the selected statuses
    """
    filtered_jobs = []

    for job in self.jobs:
        if job.succeeded() and succeeded:
            if include_expired or not job.expired():
                filtered_jobs.append(job)

        elif job.running() and running:
            filtered_jobs.append(job)

        elif job.pending() and pending:
            filtered_jobs.append(job)

        elif job.failed() and failed:
            filtered_jobs.append(job)

    return Batch(filtered_jobs)
succeeded()

Returns: True if all jobs have succeeded, otherwise returns False

Source code in hyp3_sdk/jobs.py
219
220
221
222
223
224
225
226
def succeeded(self) -> bool:
    """
    Returns: True if all jobs have succeeded, otherwise returns False
    """
    for job in self.jobs:
        if not job.succeeded():
            return False
    return True

Job

Source code in hyp3_sdk/jobs.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class Job:
    _attributes_for_resubmit = {'name', 'job_parameters', 'job_type'}

    def __init__(
            self,
            job_type: str,
            job_id: str,
            request_time: datetime,
            status_code: str,
            user_id: str,
            name: Optional[str] = None,
            job_parameters: Optional[dict] = None,
            files: Optional[List] = None,
            logs: Optional[List] = None,
            browse_images: Optional[List] = None,
            thumbnail_images: Optional[List] = None,
            expiration_time: Optional[datetime] = None,
            processing_times: Optional[List[float]] = None,
            credit_cost: Optional[float] = None,
    ):
        self.job_id = job_id
        self.job_type = job_type
        self.request_time = request_time
        self.status_code = status_code
        self.user_id = user_id
        self.name = name
        self.job_parameters = job_parameters
        self.files = files
        self.logs = logs
        self.browse_images = browse_images
        self.thumbnail_images = thumbnail_images
        self.expiration_time = expiration_time
        self.processing_times = processing_times
        self.credit_cost = credit_cost

    def __repr__(self):
        return f'Job.from_dict({self.to_dict()})'

    def __str__(self):
        return f'HyP3 {self.job_type} job {self.job_id}'

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    @staticmethod
    def from_dict(input_dict: dict):
        expiration_time = parse_date(input_dict['expiration_time']) if input_dict.get('expiration_time') else None
        return Job(
            job_type=input_dict['job_type'],
            job_id=input_dict['job_id'],
            request_time=parse_date(input_dict['request_time']),
            status_code=input_dict['status_code'],
            user_id=input_dict['user_id'],
            name=input_dict.get('name'),
            job_parameters=input_dict.get('job_parameters'),
            files=input_dict.get('files'),
            logs=input_dict.get('logs'),
            browse_images=input_dict.get('browse_images'),
            thumbnail_images=input_dict.get('thumbnail_images'),
            expiration_time=expiration_time,
            processing_times=input_dict.get('processing_times'),
            credit_cost=input_dict.get('credit_cost'),
        )

    def to_dict(self, for_resubmit: bool = False):
        job_dict = {}
        if for_resubmit:
            keys_to_process = Job._attributes_for_resubmit
        else:
            keys_to_process = vars(self).keys()

        for key in keys_to_process:
            value = self.__getattribute__(key)
            if value is not None:
                if isinstance(value, datetime):
                    job_dict[key] = value.isoformat(timespec='seconds')
                else:
                    job_dict[key] = value

        return job_dict

    def succeeded(self) -> bool:
        return self.status_code == 'SUCCEEDED'

    def failed(self) -> bool:
        return self.status_code == 'FAILED'

    def complete(self) -> bool:
        return self.succeeded() or self.failed()

    def pending(self) -> bool:
        return self.status_code == 'PENDING'

    def running(self) -> bool:
        return self.status_code == 'RUNNING'

    def expired(self) -> bool:
        return self.expiration_time is not None and datetime.now(tz.UTC) >= self.expiration_time

    def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
        """
        Args:
            location: Directory location to put files into
            create: Create `location` if it does not point to an existing directory

        Returns: list of Path objects to downloaded files
        """
        location = Path(location)

        if not self.succeeded():
            raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
        if self.expired():
            raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                               f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

        if create:
            location.mkdir(parents=True, exist_ok=True)
        elif not location.is_dir():
            raise NotADirectoryError(str(location))

        downloaded_files = []
        for file in self.files:
            download_url = file['url']
            filename = location / file['filename']
            try:
                downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
            except HTTPError:
                raise HyP3SDKError(f'Unable to download file: {download_url}')
        return downloaded_files
download_files(location='.', create=True)

Parameters:

Name Type Description Default
location Union[Path, str]

Directory location to put files into

'.'
create bool

Create location if it does not point to an existing directory

True

Returns: list of Path objects to downloaded files

Source code in hyp3_sdk/jobs.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def download_files(self, location: Union[Path, str] = '.', create: bool = True) -> List[Path]:
    """
    Args:
        location: Directory location to put files into
        create: Create `location` if it does not point to an existing directory

    Returns: list of Path objects to downloaded files
    """
    location = Path(location)

    if not self.succeeded():
        raise HyP3SDKError(f'Only succeeded jobs can be downloaded; job is {self.status_code}.')
    if self.expired():
        raise HyP3SDKError(f'Expired jobs cannot be downloaded; '
                           f'job expired {self.expiration_time.isoformat(timespec="seconds")}.')

    if create:
        location.mkdir(parents=True, exist_ok=True)
    elif not location.is_dir():
        raise NotADirectoryError(str(location))

    downloaded_files = []
    for file in self.files:
        download_url = file['url']
        filename = location / file['filename']
        try:
            downloaded_files.append(download_file(download_url, filename, chunk_size=10485760))
        except HTTPError:
            raise HyP3SDKError(f'Unable to download file: {download_url}')
    return downloaded_files

util

Extra utilities for working with HyP3

chunk(itr, n=200)

Split a sequence into small chunks

Parameters:

Name Type Description Default
itr Sequence[Any]

A sequence object to chunk

required
n int

Size of the chunks to return

200
Source code in hyp3_sdk/util.py
43
44
45
46
47
48
49
50
51
52
53
54
def chunk(itr: Sequence[Any], n: int = 200) -> Generator[Sequence[Any], None, None]:
    """Split a sequence into small chunks

    Args:
        itr: A sequence object to chunk
        n: Size of the chunks to return
    """
    if not isinstance(n, int) or n < 1:
        raise ValueError(f'n must be a positive integer: {n}')

    for i in range(0, len(itr), n):
        yield itr[i:i + n]

download_file(url, filepath, chunk_size=None, retries=2, backoff_factor=1)

Download a file Args: url: URL of the file to download filepath: Location to place file into chunk_size: Size to chunk the download into retries: Number of retries to attempt backoff_factor: Factor for calculating time between retries Returns: download_path: The path to the downloaded file

Source code in hyp3_sdk/util.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def download_file(url: str, filepath: Union[Path, str], chunk_size=None, retries=2, backoff_factor=1) -> Path:
    """Download a file
    Args:
        url: URL of the file to download
        filepath: Location to place file into
        chunk_size: Size to chunk the download into
        retries: Number of retries to attempt
        backoff_factor: Factor for calculating time between retries
    Returns:
        download_path: The path to the downloaded file
    """
    filepath = Path(filepath)
    session = requests.Session()
    retry_strategy = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=[429, 500, 502, 503, 504],
    )

    session.mount('https://', HTTPAdapter(max_retries=retry_strategy))
    session.mount('http://', HTTPAdapter(max_retries=retry_strategy))
    stream = False if chunk_size is None else True
    with session.get(url, stream=stream) as s:
        s.raise_for_status()
        tqdm = get_tqdm_progress_bar()
        with tqdm.wrapattr(open(filepath, "wb"), 'write', miniters=1, desc=filepath.name,
                           total=int(s.headers.get('content-length', 0))) as f:
            for chunk in s.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
    session.close()

    return filepath

extract_zipped_product(zip_file, delete=True)

Extract a zipped HyP3 product

Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally deleting zip file afterward.

Parameters:

Name Type Description Default
zip_file Union[str, Path]

Zipped HyP3 product to extract

required
delete bool

Delete zip_file after it has been extracted

True

Returns:

Type Description
Path

Path to the HyP3 product folder containing the product files

Source code in hyp3_sdk/util.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def extract_zipped_product(zip_file: Union[str, Path], delete: bool = True) -> Path:
    """Extract a zipped HyP3 product

    Extract a zipped HyP3 product to the same directory as the zipped HyP3 product, optionally
    deleting `zip file` afterward.

    Args:
        zip_file: Zipped HyP3 product to extract
        delete: Delete `zip_file` after it has been extracted

    Returns:
        Path to the HyP3 product folder containing the product files
    """
    zip_file = Path(zip_file)
    with ZipFile(zip_file) as z:
        z.extractall(path=zip_file.parent)

    if delete:
        zip_file.unlink()

    return zip_file.parent / zip_file.stem

get_authenticated_session(username, password)

Log into HyP3 using credentials for urs.earthdata.nasa.gov from either the provided credentials or a .netrc file.

Returns:

Type Description
Session

An authenticated HyP3 Session

Source code in hyp3_sdk/util.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_authenticated_session(username: str, password: str) -> requests.Session:
    """Log into HyP3 using credentials for `urs.earthdata.nasa.gov` from either the provided
     credentials or a `.netrc` file.

    Returns:
        An authenticated HyP3 Session
    """
    s = requests.Session()

    if username is not None and password is not None:
        response = s.get(AUTH_URL, auth=(username, password))
        auth_error_message = (
            'Was not able to authenticate with credentials provided\n'
            'This could be due to invalid credentials or a connection error.'
        )
    else:
        response = s.get(AUTH_URL)
        auth_error_message = (
            'Was not able to authenticate with .netrc file and no credentials provided\n'
            'This could be due to invalid credentials in .netrc or a connection error.'
        )

    parsed_url = urllib.parse.urlparse(response.url)
    query_params = urllib.parse.parse_qs(parsed_url.query)
    error_msg = query_params.get('error_msg')
    resolution_url = query_params.get('resolution_url')

    if error_msg is not None and resolution_url is not None:
        raise AuthenticationError(f'{error_msg[0]}: {resolution_url[0]}')

    if error_msg is not None and 'Please update your profile' in error_msg[0]:
        raise AuthenticationError(f'{error_msg[0]}: {PROFILE_URL}')

    try:
        response.raise_for_status()
    except requests.HTTPError:
        raise AuthenticationError(auth_error_message)

    return s