Source code for satpy.readers.aapp_l1b

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2012-2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Reader for aapp level 1b data.

Options for loading:

 - pre_launch_coeffs (False): use pre-launch coefficients if True, operational
   otherwise (if available).

https://nwp-saf.eumetsat.int/site/download/documentation/aapp/NWPSAF-MF-UD-003_Formats_v8.0.pdf
"""
import functools
import logging
from datetime import datetime, timedelta

import dask.array as da
import numpy as np
import xarray as xr
from dask import delayed

from satpy import CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler

LINE_CHUNK = CHUNK_SIZE ** 2 // 2048

logger = logging.getLogger(__name__)

CHANNEL_NAMES = ['1', '2', '3a', '3b', '4', '5']

ANGLES = ['sensor_zenith_angle',
          'solar_zenith_angle',
          'sun_sensor_azimuth_difference_angle']

PLATFORM_NAMES = {4: 'NOAA-15',
                  2: 'NOAA-16',
                  6: 'NOAA-17',
                  7: 'NOAA-18',
                  8: 'NOAA-19',
                  11: 'Metop-B',
                  12: 'Metop-A',
                  13: 'Metop-C',
                  14: 'Metop simulator'}


[docs]def create_xarray(arr): """Create an `xarray.DataArray`.""" res = xr.DataArray(arr, dims=['y', 'x']) return res
[docs]class AVHRRAAPPL1BFile(BaseFileHandler): """Reader for AVHRR L1B files created from the AAPP software.""" def __init__(self, filename, filename_info, filetype_info): """Initialize object information by reading the input file.""" super(AVHRRAAPPL1BFile, self).__init__(filename, filename_info, filetype_info) self.channels = {i: None for i in AVHRR_CHANNEL_NAMES} self.units = {i: 'counts' for i in AVHRR_CHANNEL_NAMES} self._data = None self._header = None self._is3b = None self._is3a = None self._shape = None self.area = None self.sensor = 'avhrr-3' self.read() self.active_channels = self._get_active_channels() self.platform_name = PLATFORM_NAMES.get(self._header['satid'][0], None) if self.platform_name is None: raise ValueError("Unsupported platform ID: %d" % self.header['satid']) def _get_active_channels(self): status = self._get_channel_binary_status_from_header() return self._convert_binary_channel_status_to_activation_dict(status) def _get_channel_binary_status_from_header(self): status = self._header['inststat1'].item() change_line = self._header['statchrecnb'] if change_line > 0: status |= self._header['inststat2'].item() return status @staticmethod def _convert_binary_channel_status_to_activation_dict(status): bits_channels = ((13, '1'), (12, '2'), (11, '3a'), (10, '3b'), (9, '4'), (8, '5')) activated = dict() for bit, channel_name in bits_channels: activated[channel_name] = bool(status >> bit & 1) return activated @property def start_time(self): """Get the time of the first observation.""" return datetime(self._data['scnlinyr'][0], 1, 1) + timedelta( days=int(self._data['scnlindy'][0]) - 1, milliseconds=int(self._data['scnlintime'][0])) @property def end_time(self): """Get the time of the final observation.""" return datetime(self._data['scnlinyr'][-1], 1, 1) + timedelta( days=int(self._data['scnlindy'][-1]) - 1, milliseconds=int(self._data['scnlintime'][-1]))
[docs] def get_dataset(self, key, info): """Get a dataset from the file.""" if key['name'] in CHANNEL_NAMES: if self.active_channels[key['name']]: dataset = self.calibrate(key) else: return None elif key['name'] in ['longitude', 'latitude']: dataset = self.navigate(key['name']) dataset.attrs = info elif key['name'] in ANGLES: dataset = self.get_angles(key['name']) else: raise ValueError("Not a supported dataset: %s", key['name']) self._update_dataset_attributes(dataset, key, info) if not self._shape: self._shape = dataset.shape return dataset
def _update_dataset_attributes(self, dataset, key, info): dataset.attrs.update({'platform_name': self.platform_name, 'sensor': self.sensor}) dataset.attrs.update(key.to_dict()) for meta_key in ('standard_name', 'units'): if meta_key in info: dataset.attrs.setdefault(meta_key, info[meta_key])
[docs] def read(self): """Read the data.""" tic = datetime.now() header = np.memmap(self.filename, dtype=_HEADERTYPE, mode="r", shape=(1, )) data = np.memmap(self.filename, dtype=_SCANTYPE, offset=22016, mode="r") logger.debug("Reading time %s", str(datetime.now() - tic)) self._header = header self._data = data
[docs] def available_datasets(self, configured_datasets=None): """Get the available datasets.""" for _, mda in configured_datasets: if mda['name'] in CHANNEL_NAMES: yield self.active_channels[mda['name']], mda else: yield True, mda
[docs] def get_angles(self, angle_id): """Get sun-satellite viewing angles.""" sunz, satz, azidiff = self._get_all_interpolated_angles() name_to_variable = dict(zip(ANGLES, (satz, sunz, azidiff))) return create_xarray(name_to_variable[angle_id])
@functools.lru_cache(maxsize=10) def _get_all_interpolated_angles(self): sunz40km, satz40km, azidiff40km = self._get_tiepoint_angles_in_degrees() return self._interpolate_arrays(sunz40km, satz40km, azidiff40km) def _get_tiepoint_angles_in_degrees(self): sunz40km = self._data["ang"][:, :, 0] * 1e-2 satz40km = self._data["ang"][:, :, 1] * 1e-2 azidiff40km = self._data["ang"][:, :, 2] * 1e-2 return sunz40km, satz40km, azidiff40km def _interpolate_arrays(self, *input_arrays): lines = input_arrays[0].shape[0] try: interpolator = self._create_40km_interpolator(lines, *input_arrays) except ImportError: logger.warning("Could not interpolate, python-geotiepoints missing.") output_arrays = input_arrays else: output_delayed = delayed(interpolator.interpolate, nout=3)() output_arrays = [da.from_delayed(out_array, (lines, 2048), in_array.dtype) for in_array, out_array in zip(input_arrays, output_delayed)] return output_arrays @staticmethod def _create_40km_interpolator(lines, *arrays_40km): from geotiepoints.interpolator import Interpolator cols40km = np.arange(24, 2048, 40) cols1km = np.arange(2048) rows40km = np.arange(lines) rows1km = np.arange(lines) along_track_order = 1 cross_track_order = 3 satint = Interpolator( arrays_40km, (rows40km, cols40km), (rows1km, cols1km), along_track_order, cross_track_order) return satint
[docs] def navigate(self, coordinate_id): """Get the longitudes and latitudes of the scene.""" lons, lats = self._get_all_interpolated_coordinates() if coordinate_id == 'longitude': return create_xarray(lons) elif coordinate_id == 'latitude': return create_xarray(lats) else: raise KeyError("Coordinate {} unknown.".format(coordinate_id))
@functools.lru_cache(maxsize=10) def _get_all_interpolated_coordinates(self): lons40km, lats40km = self._get_coordinates_in_degrees() return self._interpolate_arrays(lons40km, lats40km) def _get_coordinates_in_degrees(self): lons40km = self._data["pos"][:, :, 1] * 1e-4 lats40km = self._data["pos"][:, :, 0] * 1e-4 return lons40km, lats40km
[docs] def calibrate(self, dataset_id, pre_launch_coeffs=False, calib_coeffs=None): """Calibrate the data.""" if calib_coeffs is None: calib_coeffs = {} units = {'reflectance': '%', 'brightness_temperature': 'K', 'counts': '', 'radiance': 'W*m-2*sr-1*cm ?'} if dataset_id['name'] in ("3a", "3b") and self._is3b is None: # Is it 3a or 3b: self._is3a = da.bitwise_and(da.from_array(self._data['scnlinbit'], chunks=LINE_CHUNK), 3) == 0 self._is3b = da.bitwise_and(da.from_array(self._data['scnlinbit'], chunks=LINE_CHUNK), 3) == 1 try: vis_idx = ['1', '2', '3a'].index(dataset_id['name']) ir_idx = None except ValueError: vis_idx = None ir_idx = ['3b', '4', '5'].index(dataset_id['name']) mask = True if vis_idx is not None: coeffs = calib_coeffs.get('ch' + dataset_id['name']) if dataset_id['name'] == '3a': mask = self._is3a[:, None] ds = create_xarray( _vis_calibrate(self._data, vis_idx, dataset_id['calibration'], pre_launch_coeffs, coeffs, mask=mask)) else: if dataset_id['name'] == '3b': mask = self._is3b[:, None] ds = create_xarray( _ir_calibrate(self._header, self._data, ir_idx, dataset_id['calibration'], mask=mask)) ds.attrs['units'] = units[dataset_id['calibration']] ds.attrs.update(dataset_id._asdict()) return ds
AVHRR_CHANNEL_NAMES = ("1", "2", "3a", "3b", "4", "5") # AAPP 1b header _HEADERTYPE = np.dtype([("siteid", "S3"), ("blank", "S1"), ("l1bversnb", "<i2"), ("l1bversyr", "<i2"), ("l1bversdy", "<i2"), ("reclg", "<i2"), ("blksz", "<i2"), ("hdrcnt", "<i2"), ("filler0", "S6"), ("dataname", "S42"), ("prblkid", "S8"), ("satid", "<i2"), ("instid", "<i2"), ("datatype", "<i2"), ("tipsrc", "<i2"), ("startdatajd", "<i4"), ("startdatayr", "<i2"), ("startdatady", "<i2"), ("startdatatime", "<i4"), ("enddatajd", "<i4"), ("enddatayr", "<i2"), ("enddatady", "<i2"), ("enddatatime", "<i4"), ("cpidsyr", "<i2"), ("cpidsdy", "<i2"), ("filler1", "S8"), # data set quality indicators ("inststat1", "<i4"), ("filler2", "S2"), ("statchrecnb", "<i2"), ("inststat2", "<i4"), ("scnlin", "<i2"), ("callocscnlin", "<i2"), ("misscnlin", "<i2"), ("datagaps", "<i2"), ("okdatafr", "<i2"), ("pacsparityerr", "<i2"), ("auxsyncerrsum", "<i2"), ("timeseqerr", "<i2"), ("timeseqerrcode", "<i2"), ("socclockupind", "<i2"), ("locerrind", "<i2"), ("locerrcode", "<i2"), ("pacsstatfield", "<i2"), ("pacsdatasrc", "<i2"), ("filler3", "S4"), ("spare1", "S8"), ("spare2", "S8"), ("filler4", "S10"), # Calibration ("racalind", "<i2"), ("solarcalyr", "<i2"), ("solarcaldy", "<i2"), ("pcalalgind", "<i2"), ("pcalalgopt", "<i2"), ("scalalgind", "<i2"), ("scalalgopt", "<i2"), ("irttcoef", "<i2", (4, 6)), ("filler5", "<i4", (2, )), # radiance to temperature conversion ("albcnv", "<i4", (2, 3)), ("radtempcnv", "<i4", (3, 3)), ("filler6", "<i4", (3, )), # Navigation ("modelid", "S8"), ("nadloctol", "<i2"), ("locbit", "<i2"), ("filler7", "S2"), ("rollerr", "<i2"), ("pitcherr", "<i2"), ("yawerr", "<i2"), ("epoyr", "<i2"), ("epody", "<i2"), ("epotime", "<i4"), ("smaxis", "<i4"), ("eccen", "<i4"), ("incli", "<i4"), ("argper", "<i4"), ("rascnod", "<i4"), ("manom", "<i4"), ("xpos", "<i4"), ("ypos", "<i4"), ("zpos", "<i4"), ("xvel", "<i4"), ("yvel", "<i4"), ("zvel", "<i4"), ("earthsun", "<i4"), ("filler8", "S16"), # analog telemetry conversion ("pchtemp", "<i2", (5, )), ("reserved1", "<i2"), ("pchtempext", "<i2", (5, )), ("reserved2", "<i2"), ("pchpow", "<i2", (5, )), ("reserved3", "<i2"), ("rdtemp", "<i2", (5, )), ("reserved4", "<i2"), ("bbtemp1", "<i2", (5, )), ("reserved5", "<i2"), ("bbtemp2", "<i2", (5, )), ("reserved6", "<i2"), ("bbtemp3", "<i2", (5, )), ("reserved7", "<i2"), ("bbtemp4", "<i2", (5, )), ("reserved8", "<i2"), ("eleccur", "<i2", (5, )), ("reserved9", "<i2"), ("motorcur", "<i2", (5, )), ("reserved10", "<i2"), ("earthpos", "<i2", (5, )), ("reserved11", "<i2"), ("electemp", "<i2", (5, )), ("reserved12", "<i2"), ("chtemp", "<i2", (5, )), ("reserved13", "<i2"), ("bptemp", "<i2", (5, )), ("reserved14", "<i2"), ("mhtemp", "<i2", (5, )), ("reserved15", "<i2"), ("adcontemp", "<i2", (5, )), ("reserved16", "<i2"), ("d4bvolt", "<i2", (5, )), ("reserved17", "<i2"), ("d5bvolt", "<i2", (5, )), ("reserved18", "<i2"), ("bbtempchn3b", "<i2", (5, )), ("reserved19", "<i2"), ("bbtempchn4", "<i2", (5, )), ("reserved20", "<i2"), ("bbtempchn5", "<i2", (5, )), ("reserved21", "<i2"), ("refvolt", "<i2", (5, )), ("reserved22", "<i2"), ]) # AAPP 1b scanline _SCANTYPE = np.dtype([("scnlin", "<i2"), ("scnlinyr", "<i2"), ("scnlindy", "<i2"), ("clockdrift", "<i2"), ("scnlintime", "<i4"), ("scnlinbit", "<i2"), ("filler0", "S10"), ("qualind", "<i4"), ("scnlinqual", "<i4"), ("calqual", "<i2", (3, )), ("cbiterr", "<i2"), ("filler1", "S8"), # Calibration ("calvis", "<i4", (3, 3, 5)), ("calir", "<i4", (3, 2, 3)), ("filler2", "<i4", (3, )), # Navigation ("navstat", "<i4"), ("attangtime", "<i4"), ("rollang", "<i2"), ("pitchang", "<i2"), ("yawang", "<i2"), ("scalti", "<i2"), ("ang", "<i2", (51, 3)), ("filler3", "<i2", (3, )), ("pos", "<i4", (51, 2)), ("filler4", "<i4", (2, )), ("telem", "<i2", (103, )), ("filler5", "<i2"), ("hrpt", "<i2", (2048, 5)), ("filler6", "<i4", (2, )), # tip minor frame header ("tipmfhd", "<i2", (7, 5)), # cpu telemetry ("cputel", "S6", (2, 5)), ("filler7", "<i2", (67, )), ]) def _vis_calibrate(data, chn, calib_type, pre_launch_coeffs=False, calib_coeffs=None, mask=True): """Calibrate visible channel data. *calib_type* in count, reflectance, radiance. """ # Calibration count to albedo, the calibration is performed separately for # two value ranges. if calib_type not in ['counts', 'radiance', 'reflectance']: raise ValueError('Calibration ' + calib_type + ' unknown!') channel = da.from_array(data["hrpt"][:, :, chn], chunks=(LINE_CHUNK, 2048)) mask &= channel != 0 if calib_type == 'counts': return channel channel = channel.astype(np.float) if calib_type == 'radiance': logger.info("Radiances are not yet supported for " + "the VIS/NIR channels!") if pre_launch_coeffs: coeff_idx = 2 else: # check that coeffs are valid if np.all(data["calvis"][:, chn, 0, 4] == 0): logger.info( "No valid operational coefficients, fall back to pre-launch") coeff_idx = 2 else: coeff_idx = 0 intersection = da.from_array(data["calvis"][:, chn, coeff_idx, 4], chunks=LINE_CHUNK) if calib_coeffs is not None: logger.info("Updating from external calibration coefficients.") slope1 = da.from_array(calib_coeffs[0], chunks=LINE_CHUNK) intercept1 = da.from_array(calib_coeffs[1], chunks=LINE_CHUNK) slope2 = da.from_array(calib_coeffs[2], chunks=LINE_CHUNK) intercept2 = da.from_array(calib_coeffs[3], chunks=LINE_CHUNK) else: slope1 = da.from_array(data["calvis"][:, chn, coeff_idx, 0], chunks=LINE_CHUNK) * 1e-10 intercept1 = da.from_array(data["calvis"][:, chn, coeff_idx, 1], chunks=LINE_CHUNK) * 1e-7 slope2 = da.from_array(data["calvis"][:, chn, coeff_idx, 2], chunks=LINE_CHUNK) * 1e-10 intercept2 = da.from_array(data["calvis"][:, chn, coeff_idx, 3], chunks=LINE_CHUNK) * 1e-7 if chn == 2: slope2 = da.where(slope2 < 0, slope2 + 0.4294967296, slope2) channel = da.where(channel <= intersection[:, None], channel * slope1[:, None] + intercept1[:, None], channel * slope2[:, None] + intercept2[:, None]) channel = channel.clip(min=0) return da.where(mask, channel, np.nan) def _ir_calibrate(header, data, irchn, calib_type, mask=True): """Calibrate for IR bands. *calib_type* in brightness_temperature, radiance, count """ count = da.from_array(data["hrpt"][:, :, irchn + 2], chunks=(LINE_CHUNK, 2048)) if calib_type == 0: return count # Mask unnaturally low values mask &= count != 0 count = count.astype(np.float) k1_ = da.from_array(data['calir'][:, irchn, 0, 0], chunks=LINE_CHUNK) / 1.0e9 k2_ = da.from_array(data['calir'][:, irchn, 0, 1], chunks=LINE_CHUNK) / 1.0e6 k3_ = da.from_array(data['calir'][:, irchn, 0, 2], chunks=LINE_CHUNK) / 1.0e6 # Count to radiance conversion: rad = k1_[:, None] * count * count + k2_[:, None] * count + k3_[:, None] # Suspicious lines mask &= ((k1_ != 0) | (k2_ != 0) | (k3_ != 0))[:, None] if calib_type == 2: mask &= rad > 0.0 return da.where(mask, rad, np.nan) # Central wavenumber: cwnum = header['radtempcnv'][0, irchn, 0] if irchn == 0: cwnum = cwnum / 1.0e2 else: cwnum = cwnum / 1.0e3 bandcor_2 = header['radtempcnv'][0, irchn, 1] / 1e5 bandcor_3 = header['radtempcnv'][0, irchn, 2] / 1e6 ir_const_1 = 1.1910659e-5 ir_const_2 = 1.438833 t_planck = (ir_const_2 * cwnum) / \ np.log(1 + ir_const_1 * cwnum * cwnum * cwnum / rad) # Band corrections applied to t_planck to get correct # brightness temperature for channel: if bandcor_2 < 0: # Post AAPP-v4 tb_ = bandcor_2 + bandcor_3 * t_planck else: # AAPP 1 to 4 tb_ = (t_planck - bandcor_2) / bandcor_3 # Mask unnaturally low values return da.where(mask, tb_, np.nan)