Source code for satpy.readers.seviri_l1b_native

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""SEVIRI native format reader.

References:
    MSG Level 1.5 Native Format File Definition
    https://www.eumetsat.int/website/wcm/idc/idcplg?IdcService=GET_FILE&dDocName=PDF_FG15_MSG-NATIVE-FORMAT-15&RevisionSelectionMethod=LatestReleased&Rendition=Web
    MSG Level 1.5 Image Data Format Description
    https://www.eumetsat.int/website/wcm/idc/idcplg?IdcService=GET_FILE&dDocName=PDF_TEN_05105_MSG_IMG_DATA&RevisionSelectionMethod=LatestReleased&Rendition=Web
"""

import logging
from datetime import datetime
import numpy as np

import xarray as xr
import dask.array as da

from satpy import CHUNK_SIZE
from pyresample import geometry

from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.eum_base import recarray2dict
from satpy.readers.seviri_base import (SEVIRICalibrationHandler,
                                       CHANNEL_NAMES, CALIB, SATNUM,
                                       dec10216, VISIR_NUM_COLUMNS,
                                       VISIR_NUM_LINES, HRV_NUM_COLUMNS,
                                       VIS_CHANNELS)
from satpy.readers.seviri_l1b_native_hdr import (GSDTRecords, native_header,
                                                 native_trailer)


logger = logging.getLogger('native_msg')


[docs]class NativeMSGFileHandler(BaseFileHandler, SEVIRICalibrationHandler): """SEVIRI native format reader. The Level1.5 Image data calibration method can be changed by adding the required mode to the Scene object instantiation kwargs eg kwargs = {"calib_mode": "gsics",} """ def __init__(self, filename, filename_info, filetype_info, calib_mode='nominal'): """Initialize the reader.""" super(NativeMSGFileHandler, self).__init__(filename, filename_info, filetype_info) self.platform_name = None self.calib_mode = calib_mode # Declare required variables. # Assume a full disk file, reset in _read_header if otherwise. self.header = {} self.mda = {} self.mda['is_full_disk'] = True self.trailer = {} # Read header, prepare dask-array, read trailer # Available channels are known only after the header has been read self._read_header() self.dask_array = da.from_array(self._get_memmap(), chunks=(CHUNK_SIZE,)) self._read_trailer() @property def start_time(self): return self.header['15_DATA_HEADER']['ImageAcquisition'][ 'PlannedAcquisitionTime']['TrueRepeatCycleStart'] @property def end_time(self): return self.header['15_DATA_HEADER']['ImageAcquisition'][ 'PlannedAcquisitionTime']['PlannedRepeatCycleEnd'] @staticmethod def _calculate_area_extent(center_point, north, east, south, west, we_offset, ns_offset, column_step, line_step): ll_c = (center_point - west - 0.5 + we_offset) * column_step ll_l = (south - center_point - 0.5 + ns_offset) * line_step ur_c = (center_point - east + 0.5 + we_offset) * column_step ur_l = (north - center_point + 0.5 + ns_offset) * line_step return (ll_c, ll_l, ur_c, ur_l) def _get_data_dtype(self): """Get the dtype of the file based on the actual available channels""" pkhrec = [ ('GP_PK_HEADER', GSDTRecords.gp_pk_header), ('GP_PK_SH1', GSDTRecords.gp_pk_sh1) ] pk_head_dtype = np.dtype(pkhrec) def get_lrec(cols): lrec = [ ("gp_pk", pk_head_dtype), ("version", np.uint8), ("satid", np.uint16), ("time", (np.uint16, 5)), ("lineno", np.uint32), ("chan_id", np.uint8), ("acq_time", (np.uint16, 3)), ("line_validity", np.uint8), ("line_rquality", np.uint8), ("line_gquality", np.uint8), ("line_data", (np.uint8, cols)) ] return lrec # each pixel is 10-bits -> one line of data has 25% more bytes # than the number of columns suggest (10/8 = 1.25) visir_rec = get_lrec(int(self.mda['number_of_columns'] * 1.25)) number_of_visir_channels = len( [s for s in self.mda['channel_list'] if not s == 'HRV']) drec = [('visir', (visir_rec, number_of_visir_channels))] if self.mda['available_channels']['HRV']: hrv_rec = get_lrec(int(self.mda['hrv_number_of_columns'] * 1.25)) drec.append(('hrv', (hrv_rec, 3))) return np.dtype(drec) def _get_memmap(self): """Get the memory map for the SEVIRI data""" with open(self.filename) as fp: data_dtype = self._get_data_dtype() hdr_size = native_header.itemsize return np.memmap(fp, dtype=data_dtype, shape=(self.mda['number_of_lines'],), offset=hdr_size, mode="r") def _read_header(self): """Read the header info""" data = np.fromfile(self.filename, dtype=native_header, count=1) self.header.update(recarray2dict(data)) data15hd = self.header['15_DATA_HEADER'] sec15hd = self.header['15_SECONDARY_PRODUCT_HEADER'] # Set the list of available channels: self.mda['available_channels'] = get_available_channels(self.header) self.mda['channel_list'] = [i for i in CHANNEL_NAMES.values() if self.mda['available_channels'][i]] self.platform_id = data15hd[ 'SatelliteStatus']['SatelliteDefinition']['SatelliteId'] self.mda['platform_name'] = "Meteosat-" + SATNUM[self.platform_id] equator_radius = data15hd['GeometricProcessing'][ 'EarthModel']['EquatorialRadius'] * 1000. north_polar_radius = data15hd[ 'GeometricProcessing']['EarthModel']['NorthPolarRadius'] * 1000. south_polar_radius = data15hd[ 'GeometricProcessing']['EarthModel']['SouthPolarRadius'] * 1000. polar_radius = (north_polar_radius + south_polar_radius) * 0.5 ssp_lon = data15hd['ImageDescription'][ 'ProjectionDescription']['LongitudeOfSSP'] self.mda['projection_parameters'] = {'a': equator_radius, 'b': polar_radius, 'h': 35785831.00, 'ssp_longitude': ssp_lon} north = int(sec15hd['NorthLineSelectedRectangle']['Value']) east = int(sec15hd['EastColumnSelectedRectangle']['Value']) south = int(sec15hd['SouthLineSelectedRectangle']['Value']) west = int(sec15hd['WestColumnSelectedRectangle']['Value']) ncolumns = west - east + 1 nrows = north - south + 1 # check if the file has less rows or columns than # the maximum, if so it is an area of interest file if (nrows < VISIR_NUM_LINES) or (ncolumns < VISIR_NUM_COLUMNS): self.mda['is_full_disk'] = False # If the number of columns in the file is not divisible by 4, # UMARF will add extra columns to the file modulo = ncolumns % 4 padding = 0 if modulo > 0: padding = 4 - modulo cols_visir = ncolumns + padding # Check the VISIR calculated column dimension against # the header information cols_visir_hdr = int(sec15hd['NumberColumnsVISIR']['Value']) if cols_visir_hdr != cols_visir: logger.warning( "Number of VISIR columns from the header is incorrect!") logger.warning("Header: %d", cols_visir_hdr) logger.warning("Calculated: = %d", cols_visir) # HRV Channel - check if the area is reduced in east west # direction as this affects the number of columns in the file cols_hrv_hdr = int(sec15hd['NumberColumnsHRV']['Value']) if ncolumns < VISIR_NUM_COLUMNS: cols_hrv = cols_hrv_hdr else: cols_hrv = int(cols_hrv_hdr / 2) # self.mda represents the 16bit dimensions not 10bit self.mda['number_of_lines'] = int(sec15hd['NumberLinesVISIR']['Value']) self.mda['number_of_columns'] = cols_visir self.mda['hrv_number_of_lines'] = int(sec15hd["NumberLinesHRV"]['Value']) self.mda['hrv_number_of_columns'] = cols_hrv def _read_trailer(self): hdr_size = native_header.itemsize data_size = (self._get_data_dtype().itemsize * self.mda['number_of_lines']) with open(self.filename) as fp: fp.seek(hdr_size + data_size) data = np.fromfile(fp, dtype=native_trailer, count=1) self.trailer.update(recarray2dict(data))
[docs] def get_area_def(self, dsid): a = self.mda['projection_parameters']['a'] b = self.mda['projection_parameters']['b'] h = self.mda['projection_parameters']['h'] lon_0 = self.mda['projection_parameters']['ssp_longitude'] proj_dict = {'a': float(a), 'b': float(b), 'lon_0': float(lon_0), 'h': float(h), 'proj': 'geos', 'units': 'm'} if dsid.name == 'HRV': nlines = self.mda['hrv_number_of_lines'] ncols = self.mda['hrv_number_of_columns'] else: nlines = self.mda['number_of_lines'] ncols = self.mda['number_of_columns'] area = geometry.AreaDefinition( 'some_area_name', "On-the-fly area", 'geosmsg', proj_dict, ncols, nlines, self.get_area_extent(dsid)) return area
[docs] def get_area_extent(self, dsid): data15hd = self.header['15_DATA_HEADER'] sec15hd = self.header['15_SECONDARY_PRODUCT_HEADER'] # check for Earth model as this affects the north-south and # west-east offsets # section 3.1.4.2 of MSG Level 1.5 Image Data Format Description earth_model = data15hd['GeometricProcessing']['EarthModel'][ 'TypeOfEarthModel'] if earth_model == 2: ns_offset = 0 we_offset = 0 elif earth_model == 1: ns_offset = -0.5 we_offset = 0.5 if dsid.name == 'HRV': ns_offset = -1.5 we_offset = 1.5 else: raise NotImplementedError( 'Unrecognised Earth model: {}'.format(earth_model) ) # Calculations assume grid origin is south-east corner # section 7.2.4 of MSG Level 1.5 Image Data Format Description if dsid.name == 'HRV': reference_grid = 'ReferenceGridHRV' else: reference_grid = 'ReferenceGridVIS_IR' origins = {0: 'NW', 1: 'SW', 2: 'SE', 3: 'NE'} grid_origin = data15hd['ImageDescription'][ reference_grid]['GridOrigin'] if grid_origin != 2: msg = 'Grid origin not supported number: {}, {} corner'.format( grid_origin, origins[grid_origin] ) raise NotImplementedError(msg) # When dealing with HRV channel and full disk, area extent is # in two pieces if (dsid.name == 'HRV') and self.mda['is_full_disk']: # NOTE: Implement HRV full disk area_extent # NotImplementedError does not catch this, it must # be used elsewhere already msg = 'HRV full disk area extent not implemented.' raise RuntimeError(msg) # Otherwise area extent is in one piece, corner points are # the same as for VISIR channels, HRV channel is having # three times the amount of columns and rows else: if dsid.name == 'HRV': center_point = HRV_NUM_COLUMNS/2 coeff = 3 else: center_point = VISIR_NUM_COLUMNS/2 coeff = 1 north = coeff * int(sec15hd['NorthLineSelectedRectangle']['Value']) east = coeff * int(sec15hd['EastColumnSelectedRectangle']['Value']) west = coeff * int(sec15hd['WestColumnSelectedRectangle']['Value']) south = coeff * int(sec15hd['SouthLineSelectedRectangle']['Value']) column_step = data15hd['ImageDescription'][ reference_grid]['ColumnDirGridStep'] * 1000.0 line_step = data15hd['ImageDescription'][ reference_grid]['LineDirGridStep'] * 1000.0 area_extent = self._calculate_area_extent( center_point, north, east, south, west, we_offset, ns_offset, column_step, line_step ) return area_extent
[docs] def get_dataset(self, dsid, info, xslice=slice(None), yslice=slice(None)): if dsid.name not in self.mda['channel_list']: raise KeyError('Channel % s not available in the file' % dsid.name) elif dsid.name not in ['HRV']: shape = (self.mda['number_of_lines'], self.mda['number_of_columns']) # Check if there is only 1 channel in the list as a change # is needed in the arrray assignment ie channl id is not present if len(self.mda['channel_list']) == 1: raw = self.dask_array['visir']['line_data'] else: i = self.mda['channel_list'].index(dsid.name) raw = self.dask_array['visir']['line_data'][:, i, :] data = dec10216(raw.flatten()) data = da.flipud(da.fliplr((data.reshape(shape)))) else: shape = (self.mda['hrv_number_of_lines'], self.mda['hrv_number_of_columns']) raw2 = self.dask_array['hrv']['line_data'][:, 2, :] raw1 = self.dask_array['hrv']['line_data'][:, 1, :] raw0 = self.dask_array['hrv']['line_data'][:, 0, :] shape_layer = (self.mda['number_of_lines'], self.mda['hrv_number_of_columns']) data2 = dec10216(raw2.flatten()) data2 = da.flipud(da.fliplr((data2.reshape(shape_layer)))) data1 = dec10216(raw1.flatten()) data1 = da.flipud(da.fliplr((data1.reshape(shape_layer)))) data0 = dec10216(raw0.flatten()) data0 = da.flipud(da.fliplr((data0.reshape(shape_layer)))) data = np.zeros(shape) idx = range(0, shape[0], 3) data[idx, :] = data2 idx = range(1, shape[0], 3) data[idx, :] = data1 idx = range(2, shape[0], 3) data[idx, :] = data0 xarr = xr.DataArray(data, dims=['y', 'x']).where(data != 0).astype(np.float32) if xarr is None: dataset = None else: dataset = self.calibrate(xarr, dsid) dataset.attrs['units'] = info['units'] dataset.attrs['wavelength'] = info['wavelength'] dataset.attrs['standard_name'] = info['standard_name'] dataset.attrs['platform_name'] = self.mda['platform_name'] dataset.attrs['sensor'] = 'seviri' dataset.attrs['orbital_parameters'] = { 'projection_longitude': self.mda['projection_parameters']['ssp_longitude'], 'projection_latitude': 0., 'projection_altitude': self.mda['projection_parameters']['h']} return dataset
[docs] def calibrate(self, data, dsid): """Calibrate the data.""" tic = datetime.now() data15hdr = self.header['15_DATA_HEADER'] calibration = dsid.calibration channel = dsid.name # even though all the channels may not be present in the file, # the header does have calibration coefficients for all the channels # hence, this channel index needs to refer to full channel list i = list(CHANNEL_NAMES.values()).index(channel) if calibration == 'counts': return data if calibration in ['radiance', 'reflectance', 'brightness_temperature']: # determine the required calibration coefficients to use # for the Level 1.5 Header if (self.calib_mode.upper() != 'GSICS' and self.calib_mode.upper() != 'NOMINAL'): raise NotImplementedError( 'Unknown Calibration mode : Please check') # NB GSICS doesn't have calibration coeffs for VIS channels if (self.calib_mode.upper() != 'GSICS' or channel in VIS_CHANNELS): coeffs = data15hdr[ 'RadiometricProcessing']['Level15ImageCalibration'] gain = coeffs['CalSlope'][i] offset = coeffs['CalOffset'][i] else: coeffs = data15hdr[ 'RadiometricProcessing']['MPEFCalFeedback'] gain = coeffs['GSICSCalCoeff'][i] offset = coeffs['GSICSOffsetCount'][i] offset = offset * gain res = self._convert_to_radiance(data, gain, offset) if calibration == 'reflectance': solar_irradiance = CALIB[self.platform_id][channel]["F"] res = self._vis_calibrate(res, solar_irradiance) elif calibration == 'brightness_temperature': cal_type = data15hdr['ImageDescription'][ 'Level15ImageProduction']['PlannedChanProcessing'][i] res = self._ir_calibrate(res, channel, cal_type) logger.debug("Calibration time " + str(datetime.now() - tic)) return res
[docs]def get_available_channels(header): """Get the available channels from the header information""" chlist_str = header['15_SECONDARY_PRODUCT_HEADER'][ 'SelectedBandIDs']['Value'] retv = {} for idx, char in zip(range(12), chlist_str): retv[CHANNEL_NAMES[idx + 1]] = (char == 'X') return retv