import re
from copy import copy
import numpy as nm
from sfepy.base.base import (as_float_or_complex, get_default, assert_,
Container, Struct, basestr, goptions)
from sfepy.base.compat import in1d
# Used for imports in term files.
from sfepy.terms.extmods import terms
_match_args = re.compile(r'^([^\(\}]*)\((.*)\)$').match
_match_virtual = re.compile('^virtual$').match
_match_state = re.compile('^state(_[_a-zA-Z0-9]+)?$').match
_match_parameter = re.compile('^parameter(_[_a-zA-Z0-9]+)?$').match
_match_material = re.compile('^material(_[_a-zA-Z0-9]+)?$').match
_match_material_opt = re.compile('^opt_material(_[_a-zA-Z0-9]+)?$').match
_match_material_root = re.compile(r'(.+)\.(.*)').match
_match_ts = re.compile('^ts$').match
[docs]
def get_arg_kinds(arg_types):
"""
Translate `arg_types` of a Term to a canonical form.
Parameters
----------
arg_types : tuple of strings
The term argument types, as given in the `arg_types` attribute.
Returns
-------
arg_kinds : list of strings
The argument kinds - one of 'virtual_variable', 'state_variable',
'parameter_variable', 'opt_material', 'ts', 'user'.
"""
arg_kinds = []
for ii, arg_type in enumerate(arg_types):
if _match_virtual(arg_type):
arg_kinds.append('virtual_variable')
elif _match_state(arg_type):
arg_kinds.append('state_variable')
elif _match_parameter(arg_type):
arg_kinds.append('parameter_variable')
elif _match_material(arg_type):
arg_kinds.append('material')
elif _match_material_opt(arg_type):
arg_kinds.append('opt_material')
if ii > 0:
msg = 'opt_material at position %d, must be at 0!' % ii
raise ValueError(msg)
elif _match_ts(arg_type):
arg_kinds.append('ts')
else:
arg_kinds.append('user')
return arg_kinds
[docs]
def get_shape_kind(integration):
"""
Get data shape kind for given integration type.
"""
if integration in ('plate', 'facet_extra'):
shape_kind = 'cell'
elif integration in ('cell', 'facet', 'point'):
shape_kind = integration
else:
raise NotImplementedError('unsupported term integration! (%s)'
% integration)
return shape_kind
[docs]
def split_complex_args(args):
"""
Split complex arguments to real and imaginary parts.
Returns
-------
newargs : dictionary
Dictionary with lists corresponding to `args` such that each
argument of numpy.complex128 data type is split to its real and
imaginary part. The output depends on the number of complex
arguments in 'args':
- 0: list (key 'r') identical to input one
- 1: two lists with keys 'r', 'i' corresponding to real
and imaginary parts
- 2: output dictionary contains four lists:
- 'r' - real(arg1), real(arg2)
- 'i' - imag(arg1), imag(arg2)
- 'ri' - real(arg1), imag(arg2)
- 'ir' - imag(arg1), real(arg2)
"""
newargs = {}
cai = []
for ii, arg in enumerate(args):
if isinstance(arg, nm.ndarray) and (arg.dtype == nm.complex128):
cai.append(ii)
if len(cai) > 0:
newargs['r'] = list(args[:])
newargs['i'] = list(args[:])
arg1 = cai[0]
newargs['r'][arg1] = args[arg1].real.copy()
newargs['i'][arg1] = args[arg1].imag.copy()
if len(cai) == 2:
arg2 = cai[1]
newargs['r'][arg2] = args[arg2].real.copy()
newargs['i'][arg2] = args[arg2].imag.copy()
newargs['ri'] = list(args[:])
newargs['ir'] = list(args[:])
newargs['ri'][arg1] = newargs['r'][arg1]
newargs['ri'][arg2] = newargs['i'][arg2]
newargs['ir'][arg1] = newargs['i'][arg1]
newargs['ir'][arg2] = newargs['r'][arg2]
elif len(cai) > 2:
raise NotImplementedError('more than 2 complex arguments! (%d)'
% len(cai))
else:
newargs['r'] = args[:]
return newargs
[docs]
def create_arg_parser(allow_derivatives=False):
from pyparsing import (Literal, Word, OneOrMore, delimitedList, Group,
StringStart, StringEnd, Combine, Optional, nums,
alphas, alphanums)
ident = Word(alphas, alphanums + "_")
inumber = Word("+-" + nums, nums)
history = Optional(Literal('[').suppress() + inumber
+ Literal(']').suppress(), default=0)("history")
history.setParseAction(lambda str, loc, toks: int(toks[0]))
variable = Group(Word(alphas, alphanums + '._') + history)
derivative = Group(Literal('d') + variable\
+ Literal('/').suppress() + Literal('dt'))
trace = Group(Literal('tr')
+ Literal('(').suppress()
+ Optional(ident + Literal(',').suppress(), default=None)
+ variable
+ Literal(')').suppress())
if allow_derivatives:
derivative2 = Group(Combine(OneOrMore(Literal('d'))) + variable)
generalized_var = derivative | derivative2 | trace | variable
else:
generalized_var = derivative | trace | variable
args = StringStart() + delimitedList(generalized_var) + StringEnd()
return args
[docs]
class ConnInfo(Struct):
[docs]
def get_region(self, can_trace=True):
if self.trace_region is not None and can_trace:
return self.region.get_mirror_region(self.trace_region)
else:
return self.region
[docs]
def get_region_name(self, can_trace=True):
reg = self.get_region(can_trace)
if reg is not None:
return reg.name
else:
return None
[docs]
class Terms(Container):
[docs]
@staticmethod
def from_desc(term_descs, regions, integrals=None):
"""
Create terms, assign each term its region.
"""
from sfepy.terms import term_table
terms = Terms()
for td in term_descs:
try:
constructor = term_table[td.name]
except:
msg = "term '%s' is not in %s" % (td.name,
sorted(term_table.keys()))
raise ValueError(msg)
try:
region = regions[td.region]
except IndexError:
raise KeyError('region "%s" does not exist!' % td.region)
term = Term.from_desc(constructor, td, region, integrals=integrals)
terms.append(term)
return terms
def __init__(self, objs=None):
Container.__init__(self, objs=objs)
self.update_expression()
[docs]
def insert(self, ii, obj):
Container.insert(self, ii, obj)
self.update_expression()
[docs]
def append(self, obj):
Container.append(self, obj)
self.update_expression()
[docs]
def update_expression(self):
self.expression = []
for term in self:
aux = [term.sign, term.name, term.arg_str,
term.integral_name, term.region.name]
self.expression.append(aux)
def __mul__(self, other):
out = Terms()
for name, term in self.iteritems():
out.append(term * other)
return out
def __rmul__(self, other):
return self * other
def __add__(self, other):
if isinstance(other, Term):
out = self.copy()
out.append(other)
elif isinstance(other, Terms):
out = Terms(self._objs + other._objs)
else:
raise ValueError('cannot add Terms with %s!' % other)
return out
def __radd__(self, other):
return self + other
def __sub__(self, other):
if isinstance(other, Term):
out = self + (-other)
elif isinstance(other, Terms):
out = self + (-other)
else:
raise ValueError('cannot subtract Terms with %s!' % other)
return out
def __rsub__(self, other):
return -self + other
def __pos__(self):
return self
def __neg__(self):
return -1.0 * self
[docs]
def setup(self, **kwargs):
for term in self:
term.setup(**kwargs)
[docs]
def assign_args(self, variables, materials, user=None):
"""
Assign all term arguments.
"""
for term in self:
term.assign_args(variables, materials, user)
[docs]
def get_variable_names(self):
out = []
for term in self:
out.extend(term.get_variable_names())
return list(set(out))
[docs]
def get_material_names(self):
out = []
for term in self:
out.extend(term.get_material_names())
return list(set(out))
[docs]
def get_user_names(self):
out = []
for term in self:
out.extend(term.get_user_names())
return list(set(out))
[docs]
class Term(Struct):
name = ''
arg_types = ()
arg_geometry_types = {}
arg_shapes = {}
diff_info = {}
integration = 'cell'
integration_order = None # None = any
geometries = ['1_2', '2_3', '2_4', '3_4', '3_8']
[docs]
@staticmethod
def new(name_args, integral, region, **kwargs):
"""
Create a new Term instance.
Parameters
----------
name_args : str
The term name and arguments, e.g. 'dw_laplace(m.coef, v, u)'.
integral : Integral instance
The integral defining the term quadrature.
region : Region instance
The term region (domain of integration for most terms).
**kwargs : keyword arguments, optional
The actual term arguments. Their compatibility with `name_args` is
checked when a term is added to an Equation instance, or when
evaluated.
"""
from sfepy.terms import term_table
arg_str = _match_args(name_args)
if arg_str is not None:
name, arg_str = arg_str.groups()
else:
raise ValueError('bad term syntax! (%s)' % name)
if name in term_table:
constructor = term_table[name]
else:
msg = "term '%s' is not in %s" % (name, sorted(term_table.keys()))
raise ValueError(msg)
obj = constructor(name, arg_str, integral, region, **kwargs)
return obj
[docs]
@staticmethod
def from_desc(constructor, desc, region, integrals=None):
from sfepy.discrete import Integrals
if integrals is None:
integrals = Integrals()
integral = integrals.get(desc.integral)
obj = constructor(desc.name, desc.args, integral, region)
obj.sign = desc.sign
return obj
[docs]
@staticmethod
def tile_mat(mat, nel):
if mat.shape[0] == 1 and nel > 1:
return nm.tile(mat, (nel,) + (mat.ndim - 1) * (1,))
else:
return mat
def __init__(self, name, arg_str, integral, region, **kwargs):
self.name = name
self.arg_str = arg_str
self.region = region
self._kwargs = kwargs
self.sign = 1.0
self.set_integral(integral)
def __mul__(self, other):
try:
mul = as_float_or_complex(other)
except ValueError:
raise ValueError('cannot multiply Term with %s!' % other)
out = self.copy(name=self.name)
out.sign = mul * self.sign
return out
def __rmul__(self, other):
return self * other
def __add__(self, other):
if isinstance(other, Term):
out = Terms([self, other])
else:
out = NotImplemented
return out
def __sub__(self, other):
if isinstance(other, Term):
out = Terms([self, -1.0 * other])
else:
out = NotImplemented
return out
def __pos__(self):
return self
def __neg__(self):
out = -1.0 * self
return out
[docs]
def get_str(self):
return '{:+} * {}.{}.{}({})'.format(
self.sign, self.name, self.integral.order,
self.region.name, self.arg_str)
[docs]
def set_integral(self, integral):
"""
Set the term integral.
"""
if self.integration_order is not None:
if integral.order != self.integration_order:
raise ValueError(
f'integral order for "{self.name}" term must be'
f' {self.integration_order}! (is {integral.order}'
f' of integral "{integral.name}")'
)
self.integral = integral
if self.integral is not None:
self.integral_name = self.integral.name
[docs]
def setup(self, allow_derivatives=False):
self.function = Struct.get(self, 'function', None)
self.step = 0
self.dt = 1.0
self.is_quasistatic = False
self.has_region = True
self.setup_formal_args(allow_derivatives=allow_derivatives)
if self._kwargs:
self.setup_args(**self._kwargs)
else:
self.args = []
[docs]
def setup_args(self, **kwargs):
self._kwargs = kwargs
self.args = []
for ia, arg_name in enumerate(self.arg_names):
if isinstance(arg_name, basestr):
name, append = arg_name, None
else:
name, append = arg_name
arg = self._kwargs.get(name)
if arg is None:
raise ValueError(
f"term '{self.get_str()}': "
f"{ia+1}. term argument '{name}' not found!"
)
actual_name = getattr(arg, 'name', name)
if (actual_name != name):
raise ValueError(
f"term '{self.get_str()}': "
f"{ia+1}. term argument name '{name}' differs from "
f"the actual argument name '{actual_name}'!"
)
term_arg = arg if append is None else (arg, append)
self.args.append(term_arg)
self.classify_args()
self.check_args()
self.setup_geometry_types()
[docs]
def assign_args(self, variables, materials, user=None):
"""
Check term argument existence in variables, materials, user data
and assign the arguments to terms. Also check compatibility of
field and term regions.
"""
if user is None:
user = {}
user.setdefault('ts', Struct())
kwargs = {}
for arg_name in self.arg_names:
if isinstance(arg_name, basestr):
if arg_name in variables.names:
kwargs[arg_name] = variables[arg_name]
elif arg_name in user:
kwargs[arg_name] = user[arg_name]
else:
raise ValueError('argument %s not found!' % arg_name)
else:
arg_name = arg_name[0]
if arg_name in materials.names:
kwargs[arg_name] = materials[arg_name]
else:
raise ValueError('material argument %s not found!'
% arg_name)
self.setup_args(**kwargs)
[docs]
def classify_args(self):
"""
Classify types of the term arguments and find matching call
signature.
A state variable can be in place of a parameter variable and
vice versa.
"""
self.names = Struct(name='arg_names',
material=[], variable=[], user=[],
state=[], virtual=[], parameter=[])
# Prepare for 'opt_material' - just prepend a None argument if needed.
if isinstance(self.arg_types[0], tuple):
arg_types = self.arg_types[0]
else:
arg_types = self.arg_types
if len(arg_types) == (len(self.args) + 1):
self.args.insert(0, (None, None))
self.arg_names.insert(0, (None, None))
if isinstance(self.arg_types[0], tuple):
assert_(len(self.modes) == len(self.arg_types))
# Find matching call signature using variable arguments - material
# and user arguments are ignored!
matched = []
for it, arg_types in enumerate(self.arg_types):
arg_kinds = get_arg_kinds(arg_types)
if self._check_variables(arg_kinds):
matched.append((it, arg_kinds))
if len(matched) == 1:
i_match, arg_kinds = matched[0]
arg_types = self.arg_types[i_match]
self.mode = self.modes[i_match]
elif len(matched) == 0:
msg = 'cannot match arguments! (%s)' % self.arg_names
raise ValueError(msg)
else:
msg = 'ambiguous arguments! (%s)' % self.arg_names
raise ValueError(msg)
else:
arg_types = self.arg_types
arg_kinds = get_arg_kinds(self.arg_types)
self.mode = Struct.get(self, 'mode', None)
if not self._check_variables(arg_kinds):
raise ValueError('cannot match variables! (%s)'
% self.arg_names)
# Set actual argument types.
self.ats = list(arg_types)
for ii, arg_kind in enumerate(arg_kinds):
name = self.arg_names[ii]
if arg_kind.endswith('variable'):
names = self.names.variable
if arg_kind == 'virtual_variable':
self.names.virtual.append(name)
elif arg_kind == 'state_variable':
self.names.state.append(name)
elif arg_kind == 'parameter_variable':
self.names.parameter.append(name)
elif arg_kind.endswith('material'):
# This should be better checked already in create_arg_parser().
if not isinstance(name, tuple):
raise ValueError('wrong material argument %s of term %s!'
% (name, self.get_str()))
names = self.names.material
else:
names = self.names.user
names.append(name)
self.n_virtual = len(self.names.virtual)
if self.n_virtual > 1:
raise ValueError('at most one virtual variable is allowed! (%d)'
% self.n_virtual)
self.set_arg_types()
self.setup_integration()
def _check_variables(self, arg_kinds):
for ii, arg_kind in enumerate(arg_kinds):
if arg_kind.endswith('variable'):
var = self.args[ii]
check = {'virtual_variable' : var.is_virtual,
'state_variable' : var.is_state_or_parameter,
'parameter_variable' : var.is_state_or_parameter}
if not check[arg_kind]():
return False
else:
return True
[docs]
def set_arg_types(self):
pass
[docs]
def check_args(self):
"""
Common checking to all terms.
Check compatibility of field and term regions.
"""
vns = self.get_variable_names()
for name in vns:
field = self._kwargs[name].get_field()
if field is None:
continue
region = self.region
if self.arg_traces[name]:
mreg_name = self.arg_trace_regions[name]
if mreg_name is None:
mreg_name = region.setup_mirror_region(mreg_name,
ret_name=True)
self.arg_trace_regions[name] = mreg_name
else:
region.setup_mirror_region(mreg_name)
region = region.get_mirror_region(mreg_name)
if not nm.all(in1d(region.vertices,
field.region.vertices)):
msg = ('%s: incompatible regions: (self, field %s)'
+ '(%s in %s)') %\
(self.name, field.name,
self.region.vertices, field.region.vertices)
raise ValueError(msg)
[docs]
def get_variable_names(self):
return self.names.variable
[docs]
def get_material_names(self, part=0):
out = []
for aux in self.names.material:
if aux[0] is not None:
out.append(aux[part])
return out
[docs]
def get_user_names(self):
return self.names.user
[docs]
def get_virtual_name(self):
if not self.names.virtual:
return None
var = self.get_virtual_variable()
return var.name
[docs]
def get_state_names(self):
"""
If variables are given, return only true unknowns whose data are of
the current time step (0).
"""
variables = self.get_state_variables()
return [var.name for var in variables]
[docs]
def get_parameter_names(self):
return copy(self.names.parameter)
[docs]
def get_conn_key(self):
"""The key to be used in DOF connectivity information."""
key = (self.name,) + tuple(self.arg_names)
arg_traces = [k for k, v in self.arg_traces.items() if v]
if len(arg_traces) > 0:
atr = arg_traces[-1]
trace = self.arg_trace_regions[atr], atr
else:
trace = None, None
key += (self.integral_name, self.region.name) + trace
return key
[docs]
def get_conn_info(self):
vvar = self.get_virtual_variable()
svars = self.get_state_variables()
pvars = self.get_parameter_variables()
all_vars = self.get_variables()
dof_conn_types = {}
for var in all_vars:
aux = var.get_primary()
pvar = aux if aux is not None else var
dof_conn_types[pvar.name] = self.get_dof_conn_type(var.name)
if vvar is None:
# No virtual variable -> all unknowns are in fact known parameters.
pvars += svars
svars = []
region = self.get_region()
if region is not None:
arg_traces = [k for k, v in self.arg_traces.items() if v]
if len(arg_traces) > 0:
aname = arg_traces[-1]
mreg_name = self.arg_trace_regions[aname]
if mreg_name is None:
mreg_name = region.setup_mirror_region(mreg_name,
ret_name=True)
self.arg_trace_regions[aname] = mreg_name
else:
region.setup_mirror_region(mreg_name)
vals = []
aux_pvars = []
for svar in svars:
# Allow only true state variables.
if not svar.is_state():
aux_pvars.append(svar)
continue
field = svar.get_field()
trace_region = self.arg_trace_regions[svar.name]
val = ConnInfo(virtual=vvar,
state=svar,
primary=svar,
has_virtual=True,
has_state=True,
trace_region=trace_region,
dof_conn_types=dof_conn_types,
region=region,
all_vars=all_vars)
vals.append(val)
pvars += aux_pvars
for pvar in pvars:
field = pvar.get_field()
trace_region = self.arg_trace_regions[pvar.name]
val = ConnInfo(virtual=vvar,
state=None,
primary=pvar.get_primary(),
has_virtual=vvar is not None,
has_state=False,
trace_region=trace_region,
dof_conn_types=dof_conn_types,
region=region,
all_vars=all_vars)
vals.append(val)
if vvar and (len(vals) == 0):
# No state, parameter variables, just the virtual one.
val = ConnInfo(virtual=vvar,
state=vvar.get_primary(),
primary=vvar.get_primary(),
has_virtual=True,
has_state=False,
trace_region=None,
dof_conn_types=dof_conn_types,
region=region,
all_vars=all_vars)
vals.append(val)
return vals
[docs]
def get_args_by_name(self, arg_names):
"""
Return arguments by name.
"""
out = []
for name in arg_names:
try:
ii = self.arg_names.index(name)
except ValueError:
raise ValueError('non-existing argument! (%s)' % name)
out.append(self.args[ii])
return out
[docs]
def get_args(self, arg_types=None, **kwargs):
"""
Return arguments by type as specified in arg_types (or
self.ats). Arguments in **kwargs can override the ones assigned
at the term construction - this is useful for passing user data.
"""
ats = self.ats
if arg_types is None:
arg_types = ats
args = []
region_name, iorder = self.region.name, self.integral.order
for at in arg_types:
ii = ats.index(at)
arg_name = self.arg_names[ii]
if isinstance(arg_name, basestr):
if arg_name in kwargs:
args.append(kwargs[arg_name])
else:
args.append(self.args[ii])
else:
mat, par_name = self.args[ii]
if mat is not None:
mat_data = mat.get_data((region_name, iorder), par_name)
else:
mat_data = None
args.append(mat_data)
return args
[docs]
def get_kwargs(self, keys, **kwargs):
"""Extract arguments from **kwargs listed in keys (default is
None)."""
return [kwargs.get(name) for name in keys]
[docs]
def get_arg_name(self, arg_type, full=False, join=None):
"""
Get the name of the argument specified by `arg_type.`
Parameters
----------
arg_type : str
The argument type string.
full : bool
If True, return the full name. For example, if the name of a
variable argument is 'u' and its time derivative is
requested, the full name is 'du/dt'.
join : str, optional
Optionally, the material argument name tuple can be joined
to a single string using the `join` string.
Returns
-------
name : str
The argument name.
"""
try:
ii = self.ats.index(arg_type)
except ValueError:
return None
name = self.arg_names[ii]
if full:
# Include derivatives.
if self.arg_derivatives[name]:
name = 'd%s/%s' % (name, self.arg_derivatives[name])
if (join is not None) and isinstance(name, tuple):
name = join.join(name)
return name
[docs]
def setup_geometry_types(self):
self.geometry_types = {}
for var in self.get_variables():
mreg_name = self.arg_trace_regions[var.name]
if mreg_name is not None:
reg = self.region.get_mirror_region(mreg_name)
else:
reg = self.region
if isinstance(self.integration, tuple):
if reg.kind in self.integration:
integration = reg.kind
elif reg.kind == 'facet' and 'facet_extra' in self.integration:
integration = 'facet_extra'
else:
raise ValueError(f'region "{reg.name}" cannot be used as '
f'"{self.name}" term integration domain!')
else:
integration = self.integration
if self.arg_geometry_types:
ii = self.arg_names.index(var.name)
agt = self.arg_geometry_types.get((self.ats[ii], self.mode))
if agt is not None:
integration = agt.get(integration, integration)
self.geometry_types[var.name] = integration, reg.tdim
[docs]
def setup_integration(self):
self.has_geometry = True
reg = self.region
if isinstance(self.integration, tuple):
if reg.kind in self.integration:
integration = reg.kind
elif reg.kind == 'facet' and 'facet_extra' in self.integration:
integration = 'facet_extra'
else:
raise ValueError(f'region "{reg.name}" cannot be used as '
f'"{self.name}" term integration domain!')
else:
integration = self.integration
self.act_integration = integration
[docs]
def get_region(self):
return self.region
[docs]
def get_assembling_cells(self, shape=None):
"""
Return the assembling cell indices into a DOF connectivity.
"""
cells = nm.arange(shape[0], dtype=nm.int32)
return cells
[docs]
def time_update(self, ts):
if ts is not None:
self.step = ts.step
self.dt = ts.dt
self.is_quasistatic = ts.is_quasistatic
if 'ts' in self._kwargs:
self._kwargs['ts'].update(ts)
[docs]
def advance(self, ts):
"""
Advance to the next time step. Implemented in subclasses.
"""
[docs]
def get_vector(self, variable):
"""Get the vector stored in `variable` according to self.arg_steps
and self.arg_derivatives. Supports only the backward difference w.r.t.
time."""
name = variable.name
return variable(step=self.arg_steps[name],
derivative=self.arg_derivatives[name])
[docs]
def get_variables(self, as_list=True):
if as_list:
variables = self.get_args_by_name(self.names.variable)
else:
variables = {}
for var in self.get_args_by_name(self.names.variable):
variables[var.name] = var
return variables
[docs]
def get_virtual_variable(self):
aux = self.get_args_by_name(self.names.virtual)
if len(aux) == 1:
var = aux[0]
else:
var = None
return var
[docs]
def get_state_variables(self, unknown_only=False):
variables = self.get_args_by_name(self.names.state)
if unknown_only:
variables = [var for var in variables
if (var.kind == 'unknown') and
(self.arg_steps[var.name] == 0)]
return variables
[docs]
def get_parameter_variables(self):
return self.get_args_by_name(self.names.parameter)
[docs]
def get_materials(self, join=False):
materials = self.get_args_by_name(self.names.material)
for mat in materials:
if mat[0] is None:
materials.remove(mat)
if join:
materials = list(set(mat[0] for mat in materials))
return materials
[docs]
def get_qp_key(self):
"""
Return a key identifying uniquely the term quadrature points.
"""
return (self.region.name, self.integral.order)
[docs]
def get_physical_qps(self):
"""
Get physical quadrature points corresponding to the term region
and integral.
"""
from sfepy.discrete.common.mappings import get_physical_qps, PhysicalQPs
if self.act_integration == 'point':
phys_qps = PhysicalQPs()
else:
phys_qps = get_physical_qps(self.region, self.integral)
return phys_qps
[docs]
def get_mapping(self, variable, get_saved=False, return_key=False):
"""
Get the reference mapping from a variable.
Notes
-----
This is a convenience wrapper of Field.get_mapping() that
initializes the arguments using the term data.
"""
integration, _ = self.geometry_types[variable.name]
mreg_name = self.arg_trace_regions[variable.name]
if mreg_name is not None:
region = self.region.get_mirror_region(mreg_name)
else:
region = self.region
out = variable.field.get_mapping(region,
self.integral, integration,
get_saved=get_saved,
return_key=return_key)
return out
[docs]
def get_data_shape(self, variable):
"""
Get data shape information from variable.
Notes
-----
This is a convenience wrapper of FieldVariable.get_data_shape() that
initializes the arguments using the term data.
"""
integration, _ = self.geometry_types[variable.name]
mreg_name = self.arg_trace_regions[variable.name]
if mreg_name is not None:
region = self.region.get_mirror_region(mreg_name)
else:
region = self.region
out = variable.get_data_shape(self.integral, integration, region.name)
return out
[docs]
def get(self, variable, quantity_name, bf=None, integration=None,
step=None, time_derivative=None):
"""
Get the named quantity related to the variable.
Notes
-----
This is a convenience wrapper of Variable.evaluate() that
initializes the arguments using the term data.
"""
name = variable.name
step = get_default(step, self.arg_steps[name])
time_derivative = get_default(time_derivative,
self.arg_derivatives[name])
integration = get_default(integration, self.geometry_types[name][0])
data = variable.evaluate(mode=quantity_name,
region=self.region, integral=self.integral,
integration=integration,
step=step, time_derivative=time_derivative,
trace_region=self.arg_trace_regions[name],
bf=bf)
return data
[docs]
def check_shapes(self, *args, **kwargs):
"""
Check term argument shapes at run-time.
"""
from sfepy.base.base import output
from sfepy.mechanics.tensors import dim2sym
dim = self.region.field_dim if hasattr(self.region, 'field_dim')\
else self.region.dim
sym = dim2sym(dim)
def _parse_scalar_shape(sh):
if isinstance(sh, basestr):
if sh == 'D':
return dim
elif sh == 'D2':
return dim**2
elif sh == 'S':
return sym
elif sh == 'SD':
return sym * dim
elif sh == 'N': # General number.
return nm.inf
elif sh == 'str':
return 'str'
else:
return int(sh)
else:
return sh
def _parse_tuple_shape(sh):
if isinstance(sh, basestr):
return tuple((_parse_scalar_shape(ii.strip())
for ii in sh.split(',')))
else:
return (int(sh),)
arg_kinds = get_arg_kinds(self.ats)
if hasattr(self, 'arg_shapes_dict') and \
isinstance(self.arg_shapes_dict, dict):
integration = self.act_integration.split('_')[0]
arg_shapes_list = self.arg_shapes_dict[integration]
else:
arg_shapes_list = self.arg_shapes
if not isinstance(arg_shapes_list, list):
arg_shapes_list = [arg_shapes_list]
# Loop allowed shapes until a match is found, else error.
allowed_shapes = []
prev_shapes = {}
actual_shapes = {}
for _arg_shapes in arg_shapes_list:
# Unset shapes are taken from the previous iteration.
arg_shapes = copy(prev_shapes)
arg_shapes.update(_arg_shapes)
prev_shapes = arg_shapes
allowed_shapes.append(arg_shapes)
n_ok = 0
for ii, arg_kind in enumerate(arg_kinds):
if arg_kind in ('user', 'ts'):
n_ok += 1
continue
arg = args[ii]
key = '%s:%s' % (self.ats[ii], self.arg_names[ii])
if self.mode is not None:
extended_ats = self.ats[ii] + ('/%s' % self.mode)
else:
extended_ats = self.ats[ii]
try:
sh = arg_shapes[self.ats[ii]]
except KeyError:
sh = arg_shapes[extended_ats]
if arg_kind.endswith('variable'):
n_el, n_qp, _dim, n_en, n_c = self.get_data_shape(arg)
actual_shapes[key] = (n_c,)
shape = _parse_scalar_shape(sh[0] if isinstance(sh, tuple)
else sh)
if nm.isinf(shape):
n_ok += 1
else:
n_ok += shape == n_c
elif arg_kind.endswith('material'):
if arg is None: # Switched-off opt_material.
n_ok += sh is None
continue
if sh is None:
continue
prefix = ''
if isinstance(sh, basestr):
aux = tuple(ii.strip() for ii in sh.split(':'))
if len(aux) == 2:
prefix, sh = aux
if sh == 'str':
n_ok += isinstance(arg, basestr)
continue
shape = _parse_tuple_shape(sh)
ls = len(shape)
aarg = nm.array(arg, ndmin=1)
actual_shapes[key] = aarg.shape
# Substiture general dimension 'N' with actual value.
iinfs = nm.where(nm.isinf(shape))[0]
if len(iinfs):
shape = list(shape)
for iinf in iinfs:
shape[iinf] = aarg.shape[-ls+iinf]
shape = tuple(shape)
if (ls > 1) or (shape[0] > 1):
# Array.
n_ok += shape == aarg.shape[-ls:]
actual_shapes[key] = aarg.shape[-ls:]
elif (ls == 1) and (shape[0] == 1):
# Scalar constant or callable as term argument
from numbers import Number
n_ok += isinstance(arg, Number) or callable(arg)
else:
n_ok += 1
if n_ok == len(arg_kinds):
break
else:
term_str = self.get_str()
output('allowed argument shapes for term "%s":' % term_str)
output(allowed_shapes)
output('actual argument shapes:')
output(actual_shapes)
raise ValueError('wrong arguments shapes for "%s" term! (see above)'
% term_str)
[docs]
def standalone_setup(self):
from sfepy.discrete import create_adof_conns, Variables
conn_info = {'aux' : self.get_conn_info()}
adcs = create_adof_conns(conn_info, None)
variables = Variables(self.get_variables())
variables.set_adof_conns(adcs)
materials = self.get_materials(join=True)
for mat in materials:
mat.time_update(None, [Struct(terms=[self])])
[docs]
def call_get_fargs(self, args, kwargs):
try:
fargs = self.get_fargs(*args, **kwargs)
except (RuntimeError, ValueError):
terms.errclear()
raise
return fargs
[docs]
@staticmethod
def translate_fargs_mapping(function, fargs, force=False):
key0 = 'builtin_function_or_method' # cython < 3
key1 = 'cython_function_or_method' # cython >= 3
fs = [k.__class__.__name__ for k in fargs]
is_tr = lambda key: (key in function.__class__.__name__ or key in fs)
if is_tr(key0) or is_tr(key1) or force:
for ii, k in enumerate(fs):
if k == 'PyCMapping':
fargs[ii] = fargs[ii].cmap
return fargs
[docs]
def call_function(self, out, fargs):
fargs = self.translate_fargs_mapping(self.function, list(fargs))
try:
status = self.function(out, *fargs)
except (RuntimeError, ValueError):
terms.errclear()
raise
if status:
terms.errclear()
raise ValueError('term evaluation failed! (%s)' % self.name)
return status
[docs]
def eval_real(self, shape, fargs, mode='eval', term_mode=None,
diff_var=None, **kwargs):
out = nm.empty(shape, dtype=nm.float64)
if mode == 'eval':
status = self.call_function(out, fargs)
# Sum over elements but not over components.
out1 = nm.sum(out, 0).squeeze()
return out1, status
else:
status = self.call_function(out, fargs)
return out, status
[docs]
def eval_complex(self, shape, fargs, mode='eval', term_mode=None,
diff_var=None, **kwargs):
rout = nm.empty(shape, dtype=nm.float64)
fargsd = split_complex_args(fargs)
rstatus = self.call_function(rout, fargsd['r'])
if len(fargsd) >= 2:
iout = nm.empty(shape, dtype=nm.float64)
istatus = self.call_function(iout, fargsd['i'])
if len(fargsd) >= 4:
irout = nm.empty(shape, dtype=nm.float64)
irstatus = self.call_function(irout, fargsd['ir'])
riout = nm.empty(shape, dtype=nm.float64)
ristatus = self.call_function(riout, fargsd['ri'])
out = (rout - iout) + (riout + irout) * 1j
status = rstatus or istatus or ristatus or irstatus
else:
out = rout + 1j * iout
status = rstatus or istatus
else:
out, status = rout + 0j, rstatus
if mode == 'eval':
out1 = nm.sum(out, 0).squeeze()
return out1, status
else:
return out, status
[docs]
def evaluate(self, mode='eval', diff_var=None,
standalone=True, ret_status=False, **kwargs):
"""
Evaluate the term.
Parameters
----------
mode : 'eval' (default), or 'weak'
The term evaluation mode.
Returns
-------
val : float or array
In 'eval' mode, the term returns a single value (the
integral, it does not need to be a scalar), while in 'weak'
mode it returns an array for each element.
status : int, optional
The flag indicating evaluation success (0) or failure
(nonzero). Only provided if `ret_status` is True.
iels : array of ints, optional
The local elements indices in 'weak' mode. Only provided in
non-'eval' modes.
"""
if (mode == 'eval') and not (hasattr(self, 'get_eval_shape')):
raise ValueError(
f'term "{self.name}" cannot be evaluated in "eval" mode!'
)
if standalone:
self.standalone_setup()
kwargs = kwargs.copy()
term_mode = kwargs.pop('term_mode', None)
if mode in ('eval', 'el_eval', 'el_avg', 'qp'):
args = self.get_args(**kwargs)
self.check_shapes(*args)
emode = 'eval' if mode == 'el_eval' else mode
_args = tuple(args) + (emode, term_mode, diff_var)
shape, dtype = self.get_eval_shape(*_args, **kwargs)
if shape[0] == 0:
val = nm.zeros(shape, dtype=dtype)
status = 0
else:
fargs = self.call_get_fargs(_args, kwargs)
if dtype == nm.float64:
val, status = self.eval_real(shape, fargs, mode,
term_mode,
**kwargs)
elif dtype == nm.complex128:
val, status = self.eval_complex(shape, fargs, mode,
term_mode,
**kwargs)
else:
raise ValueError('unsupported term dtype! (%s)' % dtype)
val *= self.sign
out = (val,)
elif mode == 'weak':
varr = self.get_virtual_variable()
if varr is None:
raise ValueError('no virtual variable in weak mode! (in "%s")'
% self.get_str())
if diff_var is not None:
tvariables = self.get_variables(as_list=False)
if diff_var in tvariables:
varc = tvariables[diff_var]
elif diff_var in self.get_material_names(part=1):
varc = None
ii = self.get_material_names(part=1).index(diff_var)
diff_var = self.ats[ii]
else:
raise ValueError(f'variable "{diff_var}" is neither in'
' term variables nor in term.diff_info!')
args = self.get_args(**kwargs)
self.check_shapes(*args)
n_elr, n_qpr, dim, n_enr, n_cr = self.get_data_shape(varr)
n_row = n_cr * n_enr
if diff_var is None:
shape = (n_elr, 1, n_row, 1)
else:
if varc is not None:
n_elc, n_qpc, dim, n_enc, n_cc = self.get_data_shape(varc)
n_col = n_cc * n_enc
else:
n_col = self.diff_info[diff_var]
shape = (n_elr, 1, n_row, n_col)
if shape[0] == 0:
vals = nm.zeros(shape, dtype=varr.dtype)
status = 0
else:
_args = tuple(args) + (mode, term_mode, diff_var)
fargs = self.call_get_fargs(_args, kwargs)
if varr.dtype == nm.float64:
vals, status = self.eval_real(shape, fargs, mode,
term_mode,
diff_var, **kwargs)
elif varr.dtype == nm.complex128:
vals, status = self.eval_complex(shape, fargs, mode,
term_mode,
diff_var, **kwargs)
else:
raise ValueError('unsupported term dtype! (%s)'
% varr.dtype)
if not isinstance(vals, tuple):
vals *= self.sign
iels = self.get_assembling_cells(vals.shape)
else:
vals = (self.sign * vals[0],) + vals[1:]
iels = None
out = (vals, iels)
if goptions['check_term_finiteness']:
assert_(nm.isfinite(out[0]).all(),
msg='"%s" term values not finite!' % self.get_str())
if ret_status:
out = out + (status,)
if len(out) == 1:
out = out[0]
return out
[docs]
def get_dof_conn_type(self, var_name):
dct = self.geometry_types[var_name]
if dct[0] == 'facet_extra':
return ('cell', dct[1])
else:
return dct
[docs]
def assemble_to(self, asm_obj, val, iels, mode='vector', diff_var=None):
"""
Assemble the results of term evaluation.
For standard terms, assemble the values in `val` corresponding to
elements/cells `iels` into a vector or a CSR sparse matrix `asm_obj`,
depending on `mode`.
For terms with a dynamic connectivity (e.g. contact terms), in
`'matrix'` mode, return the extra COO sparse matrix instead. The extra
matrix has to be added to the global matrix by the caller. By default,
this is done in :func:`Equations.evaluate()
<sfepy.discrete.equations.Equations.evaluate()>`.
"""
import sfepy.discrete.common.extmods.assemble as asm
vvar = self.get_virtual_variable()
rname = self.region.name
extra = None
rdct = self.get_dof_conn_type(vvar.name)
if mode == 'vector':
if asm_obj.dtype == nm.float64:
assemble = asm.assemble_vector
else:
assert_(asm_obj.dtype == nm.complex128)
assemble = asm.assemble_vector_complex
for ii in range(len(val)):
if not(val[ii].dtype == nm.complex128):
val[ii] = nm.complex128(val[ii])
if not isinstance(val, tuple):
dc = vvar.get_dof_conn(rname, rdct)
assert_(val.shape[2] == dc.shape[1])
assemble(asm_obj, val, iels, 1.0, dc)
else:
vals, rows, var = val
if var.eq_map is not None:
eq = var.eq_map.eq
rows = eq[rows]
active = (rows >= 0)
vals, rows = vals[active], rows[active]
# Assumes no repeated indices in rows!
asm_obj[rows] += vals
elif mode == 'matrix':
if asm_obj.dtype == nm.float64:
assemble = asm.assemble_matrix
else:
assert_(asm_obj.dtype == nm.complex128)
assemble = asm.assemble_matrix_complex
svar = diff_var
tmd = (asm_obj.data, asm_obj.indptr, asm_obj.indices)
if ((asm_obj.dtype == nm.complex128)
and (val.dtype == nm.float64)):
val = val.astype(nm.complex128)
sign = 1.0
if self.arg_derivatives[svar.name] == 'dt':
if not self.is_quasistatic or (self.step > 0):
sign *= 1.0 / self.dt
else:
sign = 0.0
if not isinstance(val, tuple):
rdc = vvar.get_dof_conn(rname, rdct)
trace_region = self.arg_trace_regions[svar.name]
cdct = self.get_dof_conn_type(svar.name)
cdc = svar.get_dof_conn(rname, cdct, trace_region)
assert_(val.shape[2:] == (rdc.shape[1], cdc.shape[1]))
assemble(tmd[0], tmd[1], tmd[2], val, iels, sign, rdc, cdc)
else:
from scipy.sparse import coo_matrix
vals, rows, cols, rvar, cvar = val
if rvar.eq_map is not None:
req, ceq = rvar.eq_map.eq, cvar.eq_map.eq
rows, cols = req[rows], ceq[cols]
active = (rows >= 0) & (cols >= 0)
vals, rows, cols = vals[active], rows[active], cols[active]
extra = coo_matrix((sign * vals, (rows, cols)),
shape=asm_obj.shape)
else:
raise ValueError('unknown assembling mode! (%s)' % mode)
return extra