Source code for satpy.readers.abi_l1b

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Advance Baseline Imager reader for the Level 1b format

The files read by this reader are described in the official PUG document:

    https://www.goes-r.gov/users/docs/PUG-L1b-vol3.pdf

"""

import logging
from datetime import datetime

import numpy as np
import xarray as xr

from pyresample import geometry
from satpy.readers.file_handlers import BaseFileHandler
from satpy import CHUNK_SIZE

logger = logging.getLogger(__name__)

PLATFORM_NAMES = {
    'G16': 'GOES-16',
    'G17': 'GOES-17',
}


[docs]class NC_ABI_L1B(BaseFileHandler): def __init__(self, filename, filename_info, filetype_info): super(NC_ABI_L1B, self).__init__(filename, filename_info, filetype_info) # xarray's default netcdf4 engine self.nc = xr.open_dataset(self.filename, decode_cf=True, mask_and_scale=False, chunks={'x': CHUNK_SIZE, 'y': CHUNK_SIZE}) self.nc = self.nc.rename({'t': 'time'}) platform_shortname = filename_info['platform_shortname'] self.platform_name = PLATFORM_NAMES.get(platform_shortname) self.sensor = 'abi' self.nlines, self.ncols = self.nc["Rad"].shape self.coords = {} def __getitem__(self, item): """Wrapper around `self.nc[item]`. Some datasets use a 32-bit float scaling factor like the 'x' and 'y' variables which causes inaccurate unscaled data values. This method forces the scale factor to a 64-bit float first. """ data = self.nc[item] attrs = data.attrs factor = data.attrs.get('scale_factor') offset = data.attrs.get('add_offset') fill = data.attrs.get('_FillValue') if fill is not None: data = data.where(data != fill) if factor is not None: # make sure the factor is a 64-bit float # can't do this in place since data is most likely uint16 # and we are making it a 64-bit float data = data * float(factor) + offset data.attrs = attrs # handle coordinates (and recursive fun) new_coords = {} # 'time' dimension causes issues in other processing # 'x_image' and 'y_image' are confusing to some users and unnecessary # 'x' and 'y' will be overwritten by base class AreaDefinition for coord_name in ('x_image', 'y_image', 'time', 'x', 'y'): if coord_name in data.coords: del data.coords[coord_name] if item in data.coords: self.coords[item] = data for coord_name in data.coords.keys(): if coord_name not in self.coords: self.coords[coord_name] = self[coord_name] new_coords[coord_name] = self.coords[coord_name] data.coords.update(new_coords) return data
[docs] def get_shape(self, key, info): """Get the shape of the data.""" return self.nlines, self.ncols
[docs] def get_dataset(self, key, info): """Load a dataset.""" logger.debug('Reading in get_dataset %s.', key.name) radiances = self['Rad'] if key.calibration == 'reflectance': logger.debug("Calibrating to reflectances") res = self._vis_calibrate(radiances) elif key.calibration == 'brightness_temperature': logger.debug("Calibrating to brightness temperatures") res = self._ir_calibrate(radiances) elif key.calibration != 'radiance': raise ValueError("Unknown calibration '{}'".format(key.calibration)) else: res = radiances # convert to satpy standard units if res.attrs['units'] == '1': res *= 100 res.attrs['units'] = '%' res.attrs.update({'platform_name': self.platform_name, 'sensor': self.sensor, 'satellite_latitude': float(self['nominal_satellite_subpoint_lat']), 'satellite_longitude': float(self['nominal_satellite_subpoint_lon']), 'satellite_altitude': float(self['nominal_satellite_height'])}) # Add orbital parameters projection = self.nc["goes_imager_projection"] res.attrs['orbital_parameters'] = { 'projection_longitude': float(projection.attrs['longitude_of_projection_origin']), 'projection_latitude': float(projection.attrs['latitude_of_projection_origin']), 'projection_altitude': float(projection.attrs['perspective_point_height']), 'satellite_nominal_latitude': float(self['nominal_satellite_subpoint_lat']), 'satellite_nominal_longitude': float(self['nominal_satellite_subpoint_lon']), 'satellite_nominal_altitude': float(self['nominal_satellite_height']), 'yaw_flip': bool(self['yaw_flip_flag']), } res.attrs.update(key.to_dict()) # remove attributes that could be confusing later res.attrs.pop('_FillValue', None) res.attrs.pop('scale_factor', None) res.attrs.pop('add_offset', None) res.attrs.pop('_Unsigned', None) res.attrs.pop('ancillary_variables', None) # Can't currently load DQF # add in information from the filename that may be useful to the user for key in ('observation_type', 'scene_abbr', 'scan_mode', 'platform_shortname'): res.attrs[key] = self.filename_info[key] # copy global attributes to metadata for key in ('scene_id', 'orbital_slot', 'instrument_ID', 'production_site', 'timeline_ID'): res.attrs[key] = self.nc.attrs.get(key) # only include these if they are present for key in ('fusion_args',): if key in self.nc.attrs: res.attrs[key] = self.nc.attrs[key] return res
[docs] def get_area_def(self, key): """Get the area definition of the data at hand.""" projection = self.nc["goes_imager_projection"] a = projection.attrs['semi_major_axis'] h = projection.attrs['perspective_point_height'] b = projection.attrs['semi_minor_axis'] lon_0 = projection.attrs['longitude_of_projection_origin'] sweep_axis = projection.attrs['sweep_angle_axis'][0] # x and y extents in m h = float(h) x = self['x'] y = self['y'] x_l = h * x[0] x_r = h * x[-1] y_l = h * y[-1] y_u = h * y[0] x_half = (x_r - x_l) / (self.ncols - 1) / 2. y_half = (y_u - y_l) / (self.nlines - 1) / 2. area_extent = (x_l - x_half, y_l - y_half, x_r + x_half, y_u + y_half) proj_dict = {'a': float(a), 'b': float(b), 'lon_0': float(lon_0), 'h': h, 'proj': 'geos', 'units': 'm', 'sweep': sweep_axis} area = geometry.AreaDefinition( self.nc.attrs.get('orbital_slot', 'abi_geos'), self.nc.attrs.get('spatial_resolution', 'ABI L1B file area'), 'abi_geos', proj_dict, self.ncols, self.nlines, np.asarray(area_extent)) return area
def _vis_calibrate(self, data): """Calibrate visible channels to reflectance.""" solar_irradiance = self['esun'] esd = self["earth_sun_distance_anomaly_in_AU"].astype(float) factor = np.pi * esd * esd / solar_irradiance res = data * factor res.attrs = data.attrs res.attrs['units'] = '1' res.attrs['long_name'] = 'Bidirectional Reflectance' res.attrs['standard_name'] = 'toa_bidirectional_reflectance' return res def _ir_calibrate(self, data): """Calibrate IR channels to BT.""" fk1 = float(self["planck_fk1"]) fk2 = float(self["planck_fk2"]) bc1 = float(self["planck_bc1"]) bc2 = float(self["planck_bc2"]) res = (fk2 / np.log(fk1 / data + 1) - bc1) / bc2 res.attrs = data.attrs res.attrs['units'] = 'K' res.attrs['long_name'] = 'Brightness Temperature' res.attrs['standard_name'] = 'toa_brightness_temperature' return res @property def start_time(self): return datetime.strptime(self.nc.attrs['time_coverage_start'], '%Y-%m-%dT%H:%M:%S.%fZ') @property def end_time(self): return datetime.strptime(self.nc.attrs['time_coverage_end'], '%Y-%m-%dT%H:%M:%S.%fZ') def __del__(self): try: self.nc.close() except (IOError, OSError, AttributeError): pass