Source code for sfepy.terms.terms

import re
from copy import copy

import numpy as nm

from sfepy.base.base import (as_float_or_complex, get_default, assert_,
                             Container, Struct, basestr, goptions)
from sfepy.base.compat import in1d

# Used for imports in term files.
from sfepy.terms.extmods import terms

from sfepy.linalg import split_range

_match_args = re.compile('^([^\(\}]*)\((.*)\)$').match
_match_virtual = re.compile('^virtual$').match
_match_state = re.compile('^state(_[_a-zA-Z0-9]+)?$').match
_match_parameter = re.compile('^parameter(_[_a-zA-Z0-9]+)?$').match
_match_material = re.compile('^material(_[_a-zA-Z0-9]+)?$').match
_match_material_opt = re.compile('^opt_material(_[_a-zA-Z0-9]+)?$').match
_match_material_root = re.compile('(.+)\.(.*)').match

[docs]def get_arg_kinds(arg_types): """ Translate `arg_types` of a Term to a canonical form. Parameters ---------- arg_types : tuple of strings The term argument types, as given in the `arg_types` attribute. Returns ------- arg_kinds : list of strings The argument kinds - one of 'virtual_variable', 'state_variable', 'parameter_variable', 'opt_material', 'user'. """ arg_kinds = [] for ii, arg_type in enumerate(arg_types): if _match_virtual(arg_type): arg_kinds.append('virtual_variable') elif _match_state(arg_type): arg_kinds.append('state_variable') elif _match_parameter(arg_type): arg_kinds.append('parameter_variable') elif _match_material(arg_type): arg_kinds.append('material') elif _match_material_opt(arg_type): arg_kinds.append('opt_material') if ii > 0: msg = 'opt_material at position %d, must be at 0!' % ii raise ValueError(msg) else: arg_kinds.append('user') return arg_kinds
[docs]def get_shape_kind(integration): """ Get data shape kind for given integration type. """ if integration == 'surface': shape_kind = 'surface' elif integration in ('volume', 'plate', 'surface_extra'): shape_kind = 'volume' elif integration == 'point': shape_kind = 'point' else: raise NotImplementedError('unsupported term integration! (%s)' % integration) return shape_kind
[docs]def split_complex_args(args): """ Split complex arguments to real and imaginary parts. Returns ------- newargs : dictionary Dictionary with lists corresponding to `args` such that each argument of numpy.complex128 data type is split to its real and imaginary part. The output depends on the number of complex arguments in 'args': - 0: list (key 'r') identical to input one - 1: two lists with keys 'r', 'i' corresponding to real and imaginary parts - 2: output dictionary contains four lists: - 'r' - real(arg1), real(arg2) - 'i' - imag(arg1), imag(arg2) - 'ri' - real(arg1), imag(arg2) - 'ir' - imag(arg1), real(arg2) """ newargs = {} cai = [] for ii, arg in enumerate(args): if isinstance(arg, nm.ndarray) and (arg.dtype == nm.complex128): cai.append(ii) if len(cai) > 0: newargs['r'] = list(args[:]) newargs['i'] = list(args[:]) arg1 = cai[0] newargs['r'][arg1] = args[arg1].real.copy() newargs['i'][arg1] = args[arg1].imag.copy() if len(cai) == 2: arg2 = cai[1] newargs['r'][arg2] = args[arg2].real.copy() newargs['i'][arg2] = args[arg2].imag.copy() newargs['ri'] = list(args[:]) newargs['ir'] = list(args[:]) newargs['ri'][arg1] = newargs['r'][arg1] newargs['ri'][arg2] = newargs['i'][arg2] newargs['ir'][arg1] = newargs['i'][arg1] newargs['ir'][arg2] = newargs['r'][arg2] elif len(cai) > 2: raise NotImplementedError('more than 2 complex arguments! (%d)' % len(cai)) else: newargs['r'] = args[:] return newargs
[docs]def vector_chunk_generator(total_size, chunk_size, shape_in, zero=False, set_shape=True, dtype=nm.float64): if not chunk_size: chunk_size = total_size shape = list(shape_in) sizes = split_range(total_size, chunk_size) ii = nm.array(0, dtype=nm.int32) for size in sizes: chunk = nm.arange(size, dtype=nm.int32) + ii if set_shape: shape[0] = size if zero: out = nm.zeros(shape, dtype=dtype) else: out = nm.empty(shape, dtype=dtype) yield out, chunk ii += size
[docs]def create_arg_parser(): from pyparsing import Literal, Word, delimitedList, Group, \ StringStart, StringEnd, Optional, nums, alphas, alphanums inumber = Word("+-" + nums, nums) history = Optional(Literal('[').suppress() + inumber + Literal(']').suppress(), default=0)("history") history.setParseAction(lambda str, loc, toks: int(toks[0])) variable = Group(Word(alphas, alphanums + '._') + history) derivative = Group(Literal('d') + variable\ + Literal('/').suppress() + Literal('dt')) trace = Group(Literal('tr') + Literal('(').suppress() + variable \ + Literal(')').suppress()) generalized_var = derivative | trace | variable args = StringStart() + delimitedList(generalized_var) + StringEnd() return args # 22.01.2006, c
[docs]class CharacteristicFunction(Struct): def __init__(self, region): self.igs = region.igs self.region = region self.local_chunk = None self.ig = None def __call__(self, chunk_size, shape_in, zero=False, set_shape=True, ret_local_chunk=False, dtype=nm.float64): els = self.region.get_cells(self.ig) for out, chunk in vector_chunk_generator(els.shape[0], chunk_size, shape_in, zero, set_shape, dtype): self.local_chunk = chunk if ret_local_chunk: yield out, chunk else: yield out, els[chunk] self.local_chunk = None
[docs] def set_current_group(self, ig): self.ig = ig
[docs] def get_local_chunk(self): return self.local_chunk
[docs]class ConnInfo(Struct):
[docs] def get_region(self, can_trace=True): if self.is_trace and can_trace: return self.region.get_mirror_region()[0] else: return self.region
[docs] def get_region_name(self, can_trace=True): if self.is_trace and can_trace: reg = self.region.get_mirror_region()[0] else: reg = self.region if reg is not None: return reg.name else: return None
[docs] def iter_igs(self): if self.region is not None: for ig in self.region.igs: if self.virtual_igs is not None: ir = self.virtual_igs.tolist().index(ig) rig = self.virtual_igs[ir] else: rig = None if not self.is_trace: ii = ig else: ig_map_i = self.region.get_mirror_region()[2] ii = ig_map_i[ig] if self.state_igs is not None: ic = self.state_igs.tolist().index(ii) cig = self.state_igs[ic] else: cig = None yield rig, cig else: yield None, None
[docs]class Terms(Container): @staticmethod
[docs] def from_desc(term_descs, regions, integrals=None): """ Create terms, assign each term its region. """ from sfepy.terms import term_table terms = Terms() for td in term_descs: try: constructor = term_table[td.name] except: msg = "term '%s' is not in %s" % (td.name, sorted(term_table.keys())) raise ValueError(msg) try: region = regions[td.region] except IndexError: raise KeyError('region "%s" does not exist!' % td.region) term = Term.from_desc(constructor, td, region, integrals=integrals) terms.append(term) return terms
def __init__(self, objs=None): Container.__init__(self, objs=objs) self.update_expression()
[docs] def insert(self, ii, obj): Container.insert(self, ii, obj) self.update_expression()
[docs] def append(self, obj): Container.append(self, obj) self.update_expression()
[docs] def update_expression(self): self.expression = [] for term in self: aux = [term.sign, term.name, term.arg_str, term.integral_name, term.region.name] self.expression.append(aux)
def __mul__(self, other): out = Terms() for name, term in self.iteritems(): out.append(term * other) return out def __rmul__(self, other): return self * other def __add__(self, other): if isinstance(other, Term): out = self.copy() out.append(other) elif isinstance(other, Terms): out = Terms(self._objs + other._objs) else: raise ValueError('cannot add Terms with %s!' % other) return out def __radd__(self, other): return self + other def __sub__(self, other): if isinstance(other, Term): out = self + (-other) elif isinstance(other, Terms): out = self + (-other) else: raise ValueError('cannot subtract Terms with %s!' % other) return out def __rsub__(self, other): return -self + other def __pos__(self): return self def __neg__(self): return -1.0 * self
[docs] def setup(self): for term in self: term.setup()
[docs] def assign_args(self, variables, materials, user=None): """ Assign all term arguments. """ for term in self: term.assign_args(variables, materials, user)
[docs] def get_variable_names(self): out = [] for term in self: out.extend(term.get_variable_names()) return list(set(out))
[docs] def get_material_names(self): out = [] for term in self: out.extend(term.get_material_names()) return list(set(out))
[docs] def get_user_names(self): out = [] for term in self: out.extend(term.get_user_names()) return list(set(out))
[docs] def set_current_group(self, ig): for term in self: term.char_fun.set_current_group(ig)
[docs]class Term(Struct): name = '' arg_types = () arg_shapes = {} integration = 'volume' geometries = ['1_2', '2_3', '2_4', '3_4', '3_8'] @staticmethod
[docs] def new(name, integral, region, **kwargs): from sfepy.terms import term_table arg_str = _match_args(name) if arg_str is not None: name, arg_str = arg_str.groups() else: raise ValueError('bad term syntax! (%s)' % name) if name in term_table: constructor = term_table[name] else: msg = "term '%s' is not in %s" % (name, sorted(term_table.keys())) raise ValueError(msg) obj = constructor(name, arg_str, integral, region, **kwargs) return obj
@staticmethod
[docs] def from_desc(constructor, desc, region, integrals=None): from sfepy.discrete import Integrals if integrals is None: integrals = Integrals() obj = constructor(desc.name, desc.args, None, region) obj.set_integral(integrals.get(desc.integral)) obj.sign = desc.sign return obj
def __init__(self, name, arg_str, integral, region, **kwargs): self.name = name self.arg_str = arg_str self.region = region self._kwargs = kwargs self._integration = self.integration self.sign = 1.0 self.set_integral(integral) def __mul__(self, other): try: mul = as_float_or_complex(other) except ValueError: raise ValueError('cannot multiply Term with %s!' % other) out = self.copy(name=self.name) out.sign = mul * self.sign return out def __rmul__(self, other): return self * other def __add__(self, other): if isinstance(other, Term): out = Terms([self, other]) else: out = NotImplemented return out def __sub__(self, other): if isinstance(other, Term): out = Terms([self, -1.0 * other]) else: out = NotImplemented return out def __pos__(self): return self def __neg__(self): out = -1.0 * self return out
[docs] def set_integral(self, integral): """ Set the term integral. """ self.integral = integral if self.integral is not None: self.integral_name = self.integral.name
[docs] def setup(self): self.char_fun = CharacteristicFunction(self.region) self.function = Struct.get(self, 'function', None) self.step = 0 self.dt = 1.0 self.is_quasistatic = False self.has_region = True self.setup_formal_args() if self._kwargs: self.setup_args(**self._kwargs) else: self.args = []
[docs] def setup_formal_args(self): self.arg_names = [] self.arg_steps = {} self.arg_derivatives = {} self.arg_traces = {} parser = create_arg_parser() self.arg_desc = parser.parseString(self.arg_str) for arg in self.arg_desc: trace = False derivative = None if isinstance(arg[1], int): name, step = arg else: kind = arg[0] name, step = arg[1] if kind == 'd': derivative = arg[2] elif kind == 'tr': trace = True match = _match_material_root(name) if match: name = (match.group(1), match.group(2)) self.arg_names.append(name) self.arg_steps[name] = step self.arg_derivatives[name] = derivative self.arg_traces[name] = trace
[docs] def setup_args(self, **kwargs): self._kwargs = kwargs self.args = [] for arg_name in self.arg_names: if isinstance(arg_name, basestr): self.args.append(self._kwargs[arg_name]) else: self.args.append((self._kwargs[arg_name[0]], arg_name[1])) self.classify_args() self.check_args()
def __call__(self, diff_var=None, chunk_size=None, **kwargs): """ Subclasses either implement __call__ or plug in a proper _call(). """ return self._call(diff_var, chunk_size, **kwargs) def _call(self, diff_var=None, chunk_size=None, **kwargs): msg = 'base class method "_call" called for %s' \ % self.__class__.__name__ raise RuntimeError(msg)
[docs] def assign_args(self, variables, materials, user=None): """ Check term argument existence in variables, materials, user data and assign the arguments to terms. Also check compatibility of field and term subdomain lists (igs). """ if user is None: user = {} kwargs = {} for arg_name in self.arg_names: if isinstance(arg_name, basestr): if arg_name in variables.names: kwargs[arg_name] = variables[arg_name] elif arg_name in user: kwargs[arg_name] = user[arg_name] else: raise ValueError('argument %s not found!' % arg_name) else: arg_name = arg_name[0] if arg_name in materials.names: kwargs[arg_name] = materials[arg_name] else: raise ValueError('material argument %s not found!' % arg_name) self.setup_args(**kwargs)
[docs] def classify_args(self): """ Classify types of the term arguments and find matching call signature. A state variable can be in place of a parameter variable and vice versa. """ self.names = Struct(name='arg_names', material=[], variable=[], user=[], state=[], virtual=[], parameter=[]) # Prepare for 'opt_material' - just prepend a None argument if needed. if isinstance(self.arg_types[0], tuple): arg_types = self.arg_types[0] else: arg_types = self.arg_types if len(arg_types) == (len(self.args) + 1): self.args.insert(0, (None, None)) self.arg_names.insert(0, (None, None)) if isinstance(self.arg_types[0], tuple): assert_(len(self.modes) == len(self.arg_types)) # Find matching call signature using variable arguments - material # and user arguments are ignored! matched = [] for it, arg_types in enumerate(self.arg_types): arg_kinds = get_arg_kinds(arg_types) if self._check_variables(arg_kinds): matched.append((it, arg_kinds)) if len(matched) == 1: i_match, arg_kinds = matched[0] arg_types = self.arg_types[i_match] self.mode = self.modes[i_match] elif len(matched) == 0: msg = 'cannot match arguments! (%s)' % self.arg_names raise ValueError(msg) else: msg = 'ambiguous arguments! (%s)' % self.arg_names raise ValueError(msg) else: arg_types = self.arg_types arg_kinds = get_arg_kinds(self.arg_types) self.mode = Struct.get(self, 'mode', None) if not self._check_variables(arg_kinds): raise ValueError('cannot match variables! (%s)' % self.arg_names) # Set actual argument types. self.ats = list(arg_types) for ii, arg_kind in enumerate(arg_kinds): name = self.arg_names[ii] if arg_kind.endswith('variable'): names = self.names.variable if arg_kind == 'virtual_variable': self.names.virtual.append(name) elif arg_kind == 'state_variable': self.names.state.append(name) elif arg_kind == 'parameter_variable': self.names.parameter.append(name) elif arg_kind.endswith('material'): names = self.names.material else: names = self.names.user names.append(name) self.n_virtual = len(self.names.virtual) if self.n_virtual > 1: raise ValueError('at most one virtual variable is allowed! (%d)' % self.n_virtual) self.set_arg_types() self.setup_integration()
def _check_variables(self, arg_kinds): for ii, arg_kind in enumerate(arg_kinds): if arg_kind.endswith('variable'): var = self.args[ii] check = {'virtual_variable' : var.is_virtual, 'state_variable' : var.is_state_or_parameter, 'parameter_variable' : var.is_state_or_parameter} if not check[arg_kind](): return False else: return True
[docs] def set_arg_types(self): pass
[docs] def check_args(self): """ Common checking to all terms. Check compatibility of field and term subdomain lists (igs). """ vns = self.get_variable_names() for name in vns: field = self._kwargs[name].get_field() if field is None: continue if not nm.all(in1d(self.region.vertices, field.region.vertices)): msg = ('%s: incompatible regions: (self, field %s)' + '(%s in %s)') %\ (self.name, field.name, self.region.vertices, field.region.vertices) raise ValueError(msg)
[docs] def get_variable_names(self): return self.names.variable
[docs] def get_material_names(self): out = [] for aux in self.names.material: if aux[0] is not None: out.append(aux[0]) return out
[docs] def get_user_names(self): return self.names.user
[docs] def get_virtual_name(self): if not self.names.virtual: return None var = self.get_virtual_variable() return var.name
[docs] def get_state_names(self): """ If variables are given, return only true unknowns whose data are of the current time step (0). """ variables = self.get_state_variables() return [var.name for var in variables]
[docs] def get_parameter_names(self): return copy(self.names.parameter)
[docs] def get_conn_key(self): """The key to be used in DOF connectivity information.""" key = (self.name,) + tuple(self.arg_names) key += (self.integral_name, self.region.name) return key
[docs] def get_conn_info(self): vvar = self.get_virtual_variable() svars = self.get_state_variables() pvars = self.get_parameter_variables() all_vars = self.get_variables() dc_type = self.get_dof_conn_type() tgs = self.get_geometry_types() v_igs = v_tg = None if vvar is not None: field = vvar.get_field() if field is not None: v_igs = field.igs if vvar.name in tgs: v_tg = tgs[vvar.name] else: v_tg = None else: # No virtual variable -> all unknowns are in fact known parameters. pvars += svars svars = [] region = self.get_region() if region is not None: is_any_trace = reduce(lambda x, y: x or y, self.arg_traces.values()) if is_any_trace: region.setup_mirror_region() self.char_fun.igs = region.igs vals = [] aux_pvars = [] for svar in svars: # Allow only true state variables. if not svar.is_state(): aux_pvars.append(svar) continue field = svar.get_field() if field is not None: s_igs = field.igs else: s_igs = None is_trace = self.arg_traces[svar.name] if svar.name in tgs: ps_tg = tgs[svar.name] else: ps_tg = v_tg val = ConnInfo(virtual=vvar, virtual_igs=v_igs, state=svar, state_igs=s_igs, primary=svar, primary_igs=s_igs, has_virtual=True, has_state=True, is_trace=is_trace, dc_type=dc_type, v_tg=v_tg, ps_tg=ps_tg, region=region, all_vars=all_vars) vals.append(val) pvars += aux_pvars for pvar in pvars: field = pvar.get_field() if field is not None: p_igs = field.igs else: p_igs = None is_trace = self.arg_traces[pvar.name] if pvar.name in tgs: ps_tg = tgs[pvar.name] else: ps_tg = v_tg val = ConnInfo(virtual=vvar, virtual_igs=v_igs, state=None, state_igs=[], primary=pvar.get_primary(), primary_igs=p_igs, has_virtual=vvar is not None, has_state=False, is_trace=is_trace, dc_type=dc_type, v_tg=v_tg, ps_tg=ps_tg, region=region, all_vars=all_vars) vals.append(val) if vvar and (len(vals) == 0): # No state, parameter variables, just the virtual one. val = ConnInfo(virtual=vvar, virtual_igs=v_igs, state=vvar.get_primary(), state_igs=v_igs, primary=vvar.get_primary(), primary_igs=v_igs, has_virtual=True, has_state=False, is_trace=False, dc_type=dc_type, v_tg=v_tg, ps_tg=v_tg, region=region, all_vars=all_vars) vals.append(val) return vals
[docs] def get_args_by_name(self, arg_names): """ Return arguments by name. """ out = [] for name in arg_names: try: ii = self.arg_names.index(name) except ValueError: raise ValueError('non-existing argument! (%s)' % name) out.append(self.args[ii]) return out
[docs] def get_args(self, arg_types=None, **kwargs): """ Return arguments by type as specified in arg_types (or self.ats). Arguments in **kwargs can override the ones assigned at the term construction - this is useful for passing user data. """ ats = self.ats if arg_types is None: arg_types = ats args = [] region_name, iorder, ig = self.get_current_group() for at in arg_types: ii = ats.index(at) arg_name = self.arg_names[ii] if isinstance(arg_name, basestr): if arg_name in kwargs: args.append(kwargs[arg_name]) else: args.append(self.args[ii]) else: mat, par_name = self.args[ii] if mat is not None: mat_data = mat.get_data((region_name, iorder), ig, par_name) else: mat_data = None args.append(mat_data) return args
[docs] def get_kwargs(self, keys, **kwargs): """Extract arguments from **kwargs listed in keys (default is None).""" return [kwargs.get(name) for name in keys]
[docs] def get_arg_name(self, arg_type, full=False, join=None): """ Get the name of the argument specified by `arg_type.` Parameters ---------- arg_type : str The argument type string. full : bool If True, return the full name. For example, if the name of a variable argument is 'u' and its time derivative is requested, the full name is 'du/dt'. join : str, optional Optionally, the material argument name tuple can be joined to a single string using the `join` string. Returns ------- name : str The argument name. """ try: ii = self.ats.index(arg_type) except ValueError: return None name = self.arg_names[ii] if full: # Include derivatives. if self.arg_derivatives[name]: name = 'd%s/%s' % (name, self.arg_derivatives[name]) if (join is not None) and isinstance(name, tuple): name = join.join(name) return name
[docs] def setup_integration(self): self.has_geometry = True self.geometry_types = {} if isinstance(self.integration, basestr): for var in self.get_variables(): self.geometry_types[var.name] = self.integration else: if self.mode is not None: self.integration = self._integration[self.mode] if self.integration is not None: for arg_type, gtype in self.integration.iteritems(): var = self.get_args(arg_types=[arg_type])[0] self.geometry_types[var.name] = gtype gtypes = list(set(self.geometry_types.itervalues())) if 'surface_extra' in gtypes: self.dof_conn_type = 'volume' elif len(gtypes): self.dof_conn_type = gtypes[0]
[docs] def get_region(self): return self.region
[docs] def get_geometry_types(self): """ Returns ------- out : dict The required geometry types for each variable argument. """ return self.geometry_types
[docs] def get_current_group(self): return (self.region.name, self.integral.order, self.char_fun.ig)
[docs] def get_dof_conn_type(self): return Struct(name='dof_conn_info', type=self.dof_conn_type, region_name=self.region.name)
[docs] def set_current_group(self, ig): self.char_fun.set_current_group(ig)
[docs] def igs(self): return self.char_fun.igs
[docs] def get_assembling_cells(self, shape=None): """ Return the assembling cell indices into a DOF connectivity. """ cells = nm.arange(shape[0], dtype=nm.int32) return cells
[docs] def iter_groups(self): if self.dof_conn_type == 'point': igs = self.igs()[0:1] else: igs = self.igs() for ig in igs: if self.integration in ('volume', 'plate'): if not len(self.region.get_cells(ig)): continue self.set_current_group(ig) yield ig
[docs] def time_update(self, ts): if ts is not None: self.step = ts.step self.dt = ts.dt self.is_quasistatic = ts.is_quasistatic
[docs] def advance(self, ts): """ Advance to the next time step. Implemented in subclasses. """
[docs] def get_vector(self, variable): """Get the vector stored in `variable` according to self.arg_steps and self.arg_derivatives. Supports only the backward difference w.r.t. time.""" name = variable.name return variable(step=self.arg_steps[name], derivative=self.arg_derivatives[name])
[docs] def get_approximation(self, variable, get_saved=False): """ Return approximation corresponding to `variable`. Also return the corresponding geometry (actual or saved, according to `get_saved`). """ geo, _, key = self.get_mapping(variable, get_saved=get_saved, return_key=True) ig = key[2] ap = variable.get_approximation(ig) return ap, geo
[docs] def get_variables(self, as_list=True): if as_list: variables = self.get_args_by_name(self.names.variable) else: variables = {} for var in self.get_args_by_name(self.names.variable): variables[var.name] = var return variables
[docs] def get_virtual_variable(self): aux = self.get_args_by_name(self.names.virtual) if len(aux) == 1: var = aux[0] else: var = None return var
[docs] def get_state_variables(self, unknown_only=False): variables = self.get_args_by_name(self.names.state) if unknown_only: variables = [var for var in variables if (var.kind == 'unknown') and (self.arg_steps[var.name] == 0)] return variables
[docs] def get_parameter_variables(self): return self.get_args_by_name(self.names.parameter)
[docs] def get_materials(self, join=False): materials = self.get_args_by_name(self.names.material) for mat in materials: if mat[0] is None: materials.remove(mat) if join: materials = list(set(mat[0] for mat in materials)) return materials
[docs] def get_qp_key(self): """ Return a key identifying uniquely the term quadrature points. """ return (self.region.name, self.integral.order)
[docs] def get_physical_qps(self): """ Get physical quadrature points corresponding to the term region and integral. """ from sfepy.discrete.common.mappings import get_physical_qps, PhysicalQPs if self.integration == 'point': phys_qps = PhysicalQPs(self.region.igs) elif self.integration == 'plate': phys_qps = get_physical_qps(self.region, self.integral, map_kind='v') else: phys_qps = get_physical_qps(self.region, self.integral) return phys_qps
[docs] def get_mapping(self, variable, get_saved=False, return_key=False): """ Get the reference mapping from a variable. Notes ----- This is a convenience wrapper of Field.get_mapping() that initializes the arguments using the term data. """ integration = self.geometry_types[variable.name] is_trace = self.arg_traces[variable.name] if is_trace: region, ig_map, ig_map_i = self.region.get_mirror_region() ig = ig_map_i[self.char_fun.ig] else: region = self.region ig = self.char_fun.ig out = variable.field.get_mapping(ig, region, self.integral, integration, get_saved=get_saved, return_key=return_key) return out
[docs] def get_data_shape(self, variable): """ Get data shape information from variable. Notes ----- This is a convenience wrapper of FieldVariable.get_data_shape() that initializes the arguments using the term data. """ integration = self.geometry_types[variable.name] is_trace = self.arg_traces[variable.name] if is_trace: region, ig_map, ig_map_i = self.region.get_mirror_region() ig = ig_map_i[self.char_fun.ig] else: region = self.region ig = self.char_fun.ig out = variable.get_data_shape(ig, self.integral, integration, region.name) return out
[docs] def get(self, variable, quantity_name, bf=None, integration=None, step=None, time_derivative=None): """ Get the named quantity related to the variable. Notes ----- This is a convenience wrapper of Variable.evaluate() that initializes the arguments using the term data. """ name = variable.name step = get_default(step, self.arg_steps[name]) time_derivative = get_default(time_derivative, self.arg_derivatives[name]) integration = get_default(integration, self.geometry_types[name]) data = variable.evaluate(self.char_fun.ig, mode=quantity_name, region=self.region, integral=self.integral, integration=integration, step=step, time_derivative=time_derivative, is_trace=self.arg_traces[name], bf=bf) return data
[docs] def check_shapes(self, *args, **kwargs): """ Default implementation of function to check term argument shapes at run-time. """ pass
[docs] def standalone_setup(self): from sfepy.discrete import create_adof_conns, Variables conn_info = {'aux' : self.get_conn_info()} adcs = create_adof_conns(conn_info, None) variables = Variables(self.get_variables()) variables.set_adof_conns(adcs) materials = self.get_materials(join=True) for mat in materials: mat.time_update(None, [Struct(terms=[self])])
[docs] def call_get_fargs(self, args, kwargs): try: fargs = self.get_fargs(*args, **kwargs) except RuntimeError: terms.errclear() raise ValueError return fargs
[docs] def call_function(self, out, fargs): try: status = self.function(out, *fargs) except RuntimeError: terms.errclear() raise ValueError if status: terms.errclear() raise ValueError('term evaluation failed! (%s)' % self.name) return status
[docs] def eval_real(self, shape, fargs, mode='eval', term_mode=None, diff_var=None, **kwargs): out = nm.empty(shape, dtype=nm.float64) if mode == 'eval': status = self.call_function(out, fargs) # Sum over elements but not over components. out1 = nm.sum(out, 0).squeeze() return out1, status else: status = self.call_function(out, fargs) return out, status
[docs] def eval_complex(self, shape, fargs, mode='eval', term_mode=None, diff_var=None, **kwargs): rout = nm.empty(shape, dtype=nm.float64) fargsd = split_complex_args(fargs) # Assuming linear forms. Then the matrix is the # same both for real and imaginary part. rstatus = self.call_function(rout, fargsd['r']) if (diff_var is None) and len(fargsd) >= 2: iout = nm.empty(shape, dtype=nm.float64) istatus = self.call_function(iout, fargsd['i']) if mode == 'eval' and len(fargsd) >= 4: irout = nm.empty(shape, dtype=nm.float64) irstatus = self.call_function(irout, fargsd['ir']) riout = nm.empty(shape, dtype=nm.float64) ristatus = self.call_function(riout, fargsd['ri']) out = (rout - iout) + (riout + irout) * 1j status = rstatus or istatus or ristatus or irstatus else: out = rout + 1j * iout status = rstatus or istatus else: out, status = rout, rstatus if mode == 'eval': out1 = nm.sum(out, 0).squeeze() return out1, status else: return out, status
[docs] def evaluate(self, mode='eval', diff_var=None, standalone=True, ret_status=False, **kwargs): """ Evaluate the term. Parameters ---------- mode : 'eval' (default), or 'weak' The term evaluation mode. Returns ------- val : float or array In 'eval' mode, the term returns a single value (the integral, it does not need to be a scalar), while in 'weak' mode it returns an array for each element. status : int, optional The flag indicating evaluation success (0) or failure (nonzero). Only provided if `ret_status` is True. iels : array of ints, optional The local elements indices in 'weak' mode. Only provided in non-'eval' modes. """ if standalone: self.standalone_setup() kwargs = kwargs.copy() term_mode = kwargs.pop('term_mode', None) if mode == 'eval': val = 0.0 status = 0 for ig in self.iter_groups(): args = self.get_args(**kwargs) self.check_shapes(*args) _args = tuple(args) + (mode, term_mode, diff_var) fargs = self.call_get_fargs(_args, kwargs) shape, dtype = self.get_eval_shape(*_args, **kwargs) if dtype == nm.float64: _v, stat = self.eval_real(shape, fargs, mode, term_mode, **kwargs) elif dtype == nm.complex128: _v, stat = self.eval_complex(shape, fargs, mode, term_mode, **kwargs) else: raise ValueError('unsupported term dtype! (%s)' % dtype) val += _v status += stat val *= self.sign elif mode in ('el_avg', 'el', 'qp'): vals = None iels = nm.empty((0, 2), dtype=nm.int32) status = 0 for ig in self.iter_groups(): args = self.get_args(**kwargs) self.check_shapes(*args) _args = tuple(args) + (mode, term_mode, diff_var) fargs = self.call_get_fargs(_args, kwargs) shape, dtype = self.get_eval_shape(*_args, **kwargs) if dtype == nm.float64: val, stat = self.eval_real(shape, fargs, mode, term_mode, **kwargs) elif dtype == nm.complex128: val, stat = self.eval_complex(shape, fargs, mode, term_mode, **kwargs) if vals is None: vals = val else: vals = nm.r_[vals, val] _iels = self.get_assembling_cells(val.shape) aux = nm.c_[nm.repeat(ig, _iels.shape[0])[:, None], _iels[:, None]] iels = nm.r_[iels, aux] status += stat vals *= self.sign elif mode == 'weak': vals = [] iels = [] status = 0 varr = self.get_virtual_variable() if diff_var is not None: varc = self.get_variables(as_list=False)[diff_var] for ig in self.iter_groups(): args = self.get_args(**kwargs) self.check_shapes(*args) _args = tuple(args) + (mode, term_mode, diff_var) fargs = self.call_get_fargs(_args, kwargs) n_elr, n_qpr, dim, n_enr, n_cr = self.get_data_shape(varr) n_row = n_cr * n_enr if diff_var is None: shape = (n_elr, 1, n_row, 1) else: n_elc, n_qpc, dim, n_enc, n_cc = self.get_data_shape(varc) n_col = n_cc * n_enc shape = (n_elr, 1, n_row, n_col) if varr.dtype == nm.float64: val, stat = self.eval_real(shape, fargs, mode, term_mode, diff_var, **kwargs) elif varr.dtype == nm.complex128: val, stat = self.eval_complex(shape, fargs, mode, term_mode, diff_var, **kwargs) else: raise ValueError('unsupported term dtype! (%s)' % varr.dtype) vals.append(self.sign * val) iels.append((ig, self.get_assembling_cells(val.shape))) status += stat # Setup return value. if mode == 'eval': out = (val,) else: out = (vals, iels) if goptions['check_term_finiteness']: assert_(nm.isfinite(out[0]).all(), msg='%+.2e * %s.%d.%s(%s) term values not finite!' % (self.sign, self.name, self.integral.order, self.region.name, self.arg_str)) if ret_status: out = out + (status,) if len(out) == 1: out = out[0] return out
[docs] def assemble_to(self, asm_obj, val, iels, mode='vector', diff_var=None): import sfepy.discrete.fem.extmods.assemble as asm vvar = self.get_virtual_variable() dc_type = self.get_dof_conn_type() if mode == 'vector': if asm_obj.dtype == nm.float64: assemble = asm.assemble_vector else: assert_(asm_obj.dtype == nm.complex128) assemble = asm.assemble_vector_complex for ii in range(len(val)): if not(val[ii].dtype == nm.complex128): val[ii] = nm.complex128(val[ii]) for ii, (ig, _iels) in enumerate(iels): vec_in_els = val[ii] dc = vvar.get_dof_conn(dc_type, ig) assert_(vec_in_els.shape[2] == dc.shape[1]) assemble(asm_obj, vec_in_els, _iels, 1.0, dc) elif mode == 'matrix': if asm_obj.dtype == nm.float64: assemble = asm.assemble_matrix else: assert_(asm_obj.dtype == nm.complex128) assemble = asm.assemble_matrix_complex svar = diff_var tmd = (asm_obj.data, asm_obj.indptr, asm_obj.indices) for ii, (ig, _iels) in enumerate(iels): mtx_in_els = val[ii] if ((asm_obj.dtype == nm.complex128) and (mtx_in_els.dtype == nm.float64)): mtx_in_els = mtx_in_els.astype(nm.complex128) rdc = vvar.get_dof_conn(dc_type, ig) is_trace = self.arg_traces[svar.name] cdc = svar.get_dof_conn(dc_type, ig, is_trace=is_trace) assert_(mtx_in_els.shape[2:] == (rdc.shape[1], cdc.shape[1])) sign = 1.0 if self.arg_derivatives[svar.name]: if not self.is_quasistatic or (self.step > 0): sign *= 1.0 / self.dt else: sign = 0.0 assemble(tmd[0], tmd[1], tmd[2], mtx_in_els, _iels, sign, rdc, cdc) else: raise ValueError('unknown assembling mode! (%s)' % mode)