Source code for satpy.readers.viirs_compact

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Compact viirs format.

.. note:: It should be possible to further enhance this reader by performing the
   interpolation of the angles and lon/lats at the native dask chunk level.

"""

import logging
from datetime import datetime, timedelta

import h5py
import numpy as np
import xarray as xr
import dask.array as da

from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.utils import np2str
from satpy.utils import angle2xyz, lonlat2xyz, xyz2angle, xyz2lonlat
from satpy import CHUNK_SIZE

try:
    import tables
except ImportError:
    tables = None

chans_dict = {"M01": "M1",
              "M02": "M2",
              "M03": "M3",
              "M04": "M4",
              "M05": "M5",
              "M06": "M6",
              "M07": "M7",
              "M08": "M8",
              "M09": "M9",
              "M10": "M10",
              "M11": "M11",
              "M12": "M12",
              "M13": "M13",
              "M14": "M14",
              "M15": "M15",
              "M16": "M16",
              "DNB": "DNB"}

logger = logging.getLogger(__name__)

c = 299792458  # m.s-1
h = 6.6260755e-34  # m2kg.s-1
k = 1.380658e-23  # m2kg.s-2.K-1

short_names = {'NPP': 'Suomi-NPP',
               'J01': 'NOAA-20',
               'J02': 'NOAA-21'}


[docs]class VIIRSCompactFileHandler(BaseFileHandler): """A file handler class for VIIRS compact format.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(VIIRSCompactFileHandler, self).__init__(filename, filename_info, filetype_info) self.h5f = h5py.File(self.filename, "r") self.finfo = filename_info self.lons = None self.lats = None if filetype_info['file_type'] == 'compact_m': self.ch_type = 'MOD' elif filetype_info['file_type'] == 'compact_dnb': self.ch_type = 'DNB' else: raise IOError('Compact Viirs file type not recognized.') geo_data = self.h5f["Data_Products"]["VIIRS-%s-GEO" % self.ch_type]["VIIRS-%s-GEO_Gran_0" % self.ch_type] self.min_lat = np.asscalar(geo_data.attrs['South_Bounding_Coordinate']) self.max_lat = np.asscalar(geo_data.attrs['North_Bounding_Coordinate']) self.min_lon = np.asscalar(geo_data.attrs['West_Bounding_Coordinate']) self.max_lon = np.asscalar(geo_data.attrs['East_Bounding_Coordinate']) self.switch_to_cart = ((abs(self.max_lon - self.min_lon) > 90) or (max(abs(self.min_lat), abs(self.max_lat)) > 60)) self.scans = self.h5f["All_Data"]["NumberOfScans"][0] self.geostuff = self.h5f["All_Data"]['VIIRS-%s-GEO_All' % self.ch_type] self.c_align = da.from_array(self.geostuff["AlignmentCoefficient"])[ np.newaxis, np.newaxis, :, np.newaxis] self.c_exp = da.from_array(self.geostuff["ExpansionCoefficient"])[ np.newaxis, np.newaxis, :, np.newaxis] for key in self.h5f["All_Data"].keys(): if key.startswith("VIIRS") and key.endswith("SDR_All"): channel = key.split('-')[1] break # FIXME: this supposes there is only one tiepoint zone in the # track direction self.scan_size = self.h5f["All_Data/VIIRS-%s-SDR_All" % channel].attrs["TiePointZoneSizeTrack"] self.scan_size = np.asscalar(self.scan_size) self.track_offset = self.h5f["All_Data/VIIRS-%s-SDR_All" % channel].attrs["PixelOffsetTrack"] self.scan_offset = self.h5f["All_Data/VIIRS-%s-SDR_All" % channel].attrs["PixelOffsetScan"] try: self.group_locations = self.geostuff[ "TiePointZoneGroupLocationScanCompact"].value except KeyError: self.group_locations = [0] self.tpz_sizes = self.h5f["All_Data/VIIRS-%s-SDR_All" % channel].attrs["TiePointZoneSizeScan"] self.nb_tpzs = self.geostuff["NumberOfTiePointZonesScan"].value self.cache = {} self.mda = {} short_name = np2str(self.h5f.attrs['Platform_Short_Name']) self.mda['platform_name'] = short_names.get(short_name, short_name) self.mda['sensor'] = 'viirs'
[docs] def get_dataset(self, key, info): """Load a dataset.""" logger.debug('Reading %s.', key.name) if key.name in chans_dict: m_data = self.read_dataset(key, info) else: m_data = self.read_geo(key, info) m_data.attrs.update(info) return m_data
[docs] def get_bounding_box(self): """Get the bounding box of the data.""" for key in self.h5f["Data_Products"].keys(): if key.startswith("VIIRS") and key.endswith("GEO"): lats = self.h5f["Data_Products"][key][ key + '_Gran_0'].attrs['G-Ring_Latitude'] lons = self.h5f["Data_Products"][key][ key + '_Gran_0'].attrs['G-Ring_Longitude'] break else: raise KeyError('Cannot find bounding coordinates!') return lons.ravel(), lats.ravel()
@property def start_time(self): """Get the start time.""" return self.finfo['start_time'] @property def end_time(self): """Get the end time.""" end_time = datetime.combine(self.start_time.date(), self.finfo['end_time'].time()) if end_time < self.start_time: end_time += timedelta(days=1) return end_time
[docs] def read_geo(self, key, info): """Read angles.""" pairs = {('satellite_azimuth_angle', 'satellite_zenith_angle'): ("SatelliteAzimuthAngle", "SatelliteZenithAngle"), ('solar_azimuth_angle', 'solar_zenith_angle'): ("SolarAzimuthAngle", "SolarZenithAngle"), ('dnb_solar_azimuth_angle', 'dnb_solar_zenith_angle'): ("SolarAzimuthAngle", "SolarZenithAngle"), ('dnb_lunar_azimuth_angle', 'dnb_lunar_zenith_angle'): ("LunarAzimuthAngle", "LunarZenithAngle"), } for pair, fkeys in pairs.items(): if key.name in pair: if (self.cache.get(pair[0]) is None or self.cache.get(pair[1]) is None): angles = self.angles(*fkeys) self.cache[pair[0]], self.cache[pair[1]] = angles if key.name == pair[0]: return xr.DataArray(self.cache[pair[0]], name=key.name, attrs=self.mda, dims=('y', 'x')) else: return xr.DataArray(self.cache[pair[1]], name=key.name, attrs=self.mda, dims=('y', 'x')) if info.get('standard_name') in ['latitude', 'longitude']: if self.lons is None or self.lats is None: self.lons, self.lats = self.navigate() mda = self.mda.copy() mda.update(info) if info['standard_name'] == 'longitude': return xr.DataArray(self.lons, attrs=mda, dims=('y', 'x')) else: return xr.DataArray(self.lats, attrs=mda, dims=('y', 'x')) if key.name == 'dnb_moon_illumination_fraction': mda = self.mda.copy() mda.update(info) return xr.DataArray(da.from_array(self.geostuff["MoonIllumFraction"]), attrs=info)
[docs] def read_dataset(self, dataset_key, info): """Read a dataset.""" h5f = self.h5f channel = chans_dict[dataset_key.name] chan_dict = dict([(key.split("-")[1], key) for key in h5f["All_Data"].keys() if key.startswith("VIIRS")]) h5rads = h5f["All_Data"][chan_dict[channel]]["Radiance"] chunks = h5rads.chunks or CHUNK_SIZE rads = xr.DataArray(da.from_array(h5rads, chunks=chunks), name=dataset_key.name, dims=['y', 'x']).astype(np.float32) h5attrs = h5rads.attrs scans = h5f["All_Data"]["NumberOfScans"][0] rads = rads[:scans * 16, :] # if channel in ("M9", ): # arr = rads[:scans * 16, :].astype(np.float32) # arr[arr > 65526] = np.nan # arr = np.ma.masked_array(arr, mask=arr_mask) # else: # arr = np.ma.masked_greater(rads[:scans * 16, :].astype(np.float32), # 65526) rads = rads.where(rads <= 65526) try: rads = xr.where(rads <= h5attrs['Threshold'], rads * h5attrs['RadianceScaleLow'] + h5attrs['RadianceOffsetLow'], rads * h5attrs['RadianceScaleHigh'] + h5attrs['RadianceOffsetHigh']) except (KeyError, AttributeError): logger.info("Missing attribute for scaling of %s.", channel) pass unit = "W m-2 sr-1 μm-1" if dataset_key.calibration == 'counts': raise NotImplementedError("Can't get counts from this data") if dataset_key.calibration in ['reflectance', 'brightness_temperature']: # do calibrate try: # First guess: VIS or NIR data a_vis = h5attrs['EquivalentWidth'] b_vis = h5attrs['IntegratedSolarIrradiance'] dse = h5attrs['EarthSunDistanceNormalised'] rads *= 100 * np.pi * a_vis / b_vis * (dse**2) unit = "%" except KeyError: # Maybe it's IR data? try: a_ir = h5attrs['BandCorrectionCoefficientA'] b_ir = h5attrs['BandCorrectionCoefficientB'] lambda_c = h5attrs['CentralWaveLength'] rads *= 1e6 rads = (h * c) / (k * lambda_c * np.log(1 + (2 * h * c ** 2) / ((lambda_c ** 5) * rads))) rads *= a_ir rads += b_ir unit = "K" except KeyError: logger.warning("Calibration failed.") elif dataset_key.calibration != 'radiance': raise ValueError("Calibration parameter should be radiance, " "reflectance or brightness_temperature") rads = rads.clip(min=0) rads.attrs = self.mda rads.attrs['units'] = unit return rads
[docs] def navigate(self): """Generate lon and lat datasets.""" all_lon = da.from_array(self.geostuff["Longitude"]) all_lat = da.from_array(self.geostuff["Latitude"]) res = [] param_start = 0 for tpz_size, nb_tpz, start in zip(self.tpz_sizes, self.nb_tpzs, self.group_locations): lon = all_lon[:, start:start + nb_tpz + 1] lat = all_lat[:, start:start + nb_tpz + 1] c_align = self.c_align[:, :, param_start:param_start + nb_tpz, :] c_exp = self.c_exp[:, :, param_start:param_start + nb_tpz, :] param_start += nb_tpz expanded = [] if self.switch_to_cart: arrays = lonlat2xyz(lon, lat) else: arrays = (lon, lat) for data in arrays: expanded.append(expand_array( data, self.scans, c_align, c_exp, self.scan_size, tpz_size, nb_tpz, self.track_offset, self.scan_offset)) if self.switch_to_cart: res.append(xyz2lonlat(*expanded)) else: res.append(expanded) lons, lats = zip(*res) return da.hstack(lons), da.hstack(lats)
[docs] def angles(self, azi_name, zen_name): """Compute the angle datasets.""" all_lat = da.from_array(self.geostuff["Latitude"]) all_zen = da.from_array(self.geostuff[zen_name]) all_azi = da.from_array(self.geostuff[azi_name]) res = [] param_start = 0 for tpz_size, nb_tpz, start in zip(self.tpz_sizes, self.nb_tpzs, self.group_locations): lat = all_lat[:, start:start + nb_tpz + 1] zen = all_zen[:, start:start + nb_tpz + 1] azi = all_azi[:, start:start + nb_tpz + 1] c_align = self.c_align[:, :, param_start:param_start + nb_tpz, :] c_exp = self.c_exp[:, :, param_start:param_start + nb_tpz, :] param_start += nb_tpz if (np.max(azi) - np.min(azi) > 5) or (np.min(zen) < 10) or ( np.max(abs(lat)) > 80): expanded = [] for data in angle2xyz(azi, zen): expanded.append(expand_array( data, self.scans, c_align, c_exp, self.scan_size, tpz_size, nb_tpz, self.track_offset, self.scan_offset)) azi, zen = xyz2angle(*expanded) res.append((azi, zen)) else: expanded = [] for data in (azi, zen): expanded.append(expand_array( data, self.scans, c_align, c_exp, self.scan_size, tpz_size, nb_tpz, self.track_offset, self.scan_offset)) res.append(expanded) azi, zen = zip(*res) return da.hstack(azi), da.hstack(zen)
[docs]def expand_array(data, scans, c_align, c_exp, scan_size=16, tpz_size=16, nties=200, track_offset=0.5, scan_offset=0.5): """Expand *data* according to alignment and expansion.""" nties = np.asscalar(nties) tpz_size = np.asscalar(tpz_size) s_scan, s_track = da.meshgrid(np.arange(nties * tpz_size), np.arange(scans * scan_size)) s_track = (s_track.reshape(scans, scan_size, nties, tpz_size) % scan_size + track_offset) / scan_size s_scan = (s_scan.reshape(scans, scan_size, nties, tpz_size) % tpz_size + scan_offset) / tpz_size a_scan = s_scan + s_scan * (1 - s_scan) * c_exp + s_track * ( 1 - s_track) * c_align a_track = s_track data_a = data[:scans * 2:2, np.newaxis, :-1, np.newaxis] data_b = data[:scans * 2:2, np.newaxis, 1:, np.newaxis] data_c = data[1:scans * 2:2, np.newaxis, 1:, np.newaxis] data_d = data[1:scans * 2:2, np.newaxis, :-1, np.newaxis] fdata = ((1 - a_track) * ((1 - a_scan) * data_a + a_scan * data_b) + a_track * ( (1 - a_scan) * data_d + a_scan * data_c)) return fdata.reshape(scans * scan_size, nties * tpz_size)