#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to TROPOMI L2 Reader.
The TROPOspheric Monitoring Instrument (TROPOMI) is the satellite instrument
on board the Copernicus Sentinel-5 Precursor satellite. It measures key
atmospheric trace gasses, such as ozone, nitrogen oxides, sulfur dioxide,
carbon monoxide, methane, and formaldehyde.
Level 2 data products are available via the Copernicus Open Access Hub.
For more information visit the following URL:
http://www.tropomi.eu/data-products/level-2-products
"""
from satpy.readers.netcdf_utils import NetCDF4FileHandler, netCDF4
import logging
import numpy as np
logger = logging.getLogger(__name__)
[docs]class TROPOMIL2FileHandler(NetCDF4FileHandler):
"""File handler for TROPOMI L2 netCDF files."""
@property
def start_time(self):
"""Get start time."""
return self.filename_info['start_time']
@property
def end_time(self):
"""Get end time."""
return self.filename_info.get('end_time', self.start_time)
@property
def platform_shortname(self):
"""Get start time."""
return self.filename_info['platform_shortname']
@property
def sensor(self):
"""Get sensor."""
res = self['/attr/sensor']
if isinstance(res, np.ndarray):
return str(res.astype(str))
return res
[docs] def available_datasets(self, configured_datasets=None):
"""Automatically determine datasets provided by this file."""
logger.debug("Available_datasets begin...")
# Determine shape of the geolocation data (lat/lon)
lat_shape = None
for var_name, _val in self.file_content.items():
# Could probably avoid this hardcoding, will think on it
if (var_name == 'PRODUCT/latitude'):
lat_shape = self[var_name + "/shape"]
break
handled_variables = set()
# update previously configured datasets
logger.debug("Starting previously configured variables loop...")
for is_avail, ds_info in (configured_datasets or []):
# some other file handler knows how to load this
if is_avail is not None:
yield is_avail, ds_info
var_name = ds_info.get('file_key', ds_info['name'])
# logger.debug("Evaluating previously configured variable: %s", var_name)
matches = self.file_type_matches(ds_info['file_type'])
# we can confidently say that we can provide this dataset and can
# provide more info
if matches and var_name in self:
logger.debug("Handling previously configured variable: %s", var_name)
handled_variables.add(var_name)
new_info = ds_info.copy() # don't mess up the above yielded
yield True, new_info
elif is_avail is None:
# if we didn't know how to handle this dataset and no one else did
# then we should keep it going down the chain
yield is_avail, ds_info
# This is where we dynamically add new datasets
# We will sift through all groups and variables, looking for data matching
# the geolocation bounds
# Iterate over dataset contents
for var_name, val in self.file_content.items():
# Only evaluate variables
if isinstance(val, netCDF4.Variable):
logger.debug("Evaluating new variable: %s", var_name)
var_shape = self[var_name + "/shape"]
logger.debug("Dims:{}".format(var_shape))
if (var_shape == lat_shape):
logger.debug("Found valid additional dataset: %s", var_name)
# Skip anything we have already configured
if (var_name in handled_variables):
logger.debug("Already handled, skipping: %s", var_name)
continue
handled_variables.add(var_name)
last_index_separator = var_name.rindex('/')
last_index_separator = last_index_separator + 1
var_name_no_path = var_name[last_index_separator:]
logger.debug("Using short name of: %s", var_name_no_path)
# Create new ds_info object
new_info = {
'name': var_name_no_path,
'file_key': var_name,
'coordinates': ['longitude', 'latitude'],
'file_type': self.filetype_info['file_type'],
'resolution': None,
}
yield True, new_info
def _rename_dims(self, data_arr):
"""Normalize dimension names with the rest of Satpy."""
dims_dict = {}
if 'ground_pixel' in data_arr.dims:
dims_dict['ground_pixel'] = 'x'
if 'scanline' in data_arr.dims:
dims_dict['scanline'] = 'y'
return data_arr.rename(dims_dict)
[docs] def get_dataset(self, ds_id, ds_info):
"""Get dataset."""
logger.debug("Getting data for: %s", ds_id.name)
file_key = ds_info.get('file_key', ds_id.name)
data = self[file_key]
data.attrs = self.get_metadata(data, ds_info)
fill = data.attrs.pop('_FillValue')
data = data.squeeze()
data = data.where(data != fill)
data = self._rename_dims(data)
return data