SfePy NTC

Source code for shaper

#!/usr/bin/env python
# 06.04.2005, c
# 16.06.2005
from optparse import OptionParser

import numpy as nm

import sfepy
from sfepy.base.base import output, remap_dict, Struct, IndexedStruct
from sfepy.base.conf import ProblemConf, get_standard_keywords
from sfepy.discrete.evaluate import BasicEvaluator
import sfepy.base.ioutils as io
import sfepy.optimize.shape_optim as so
from sfepy.discrete.problem import Problem
from sfepy.solvers import Solver

[docs]def solve_stokes(dpb, equations_stokes, nls_conf): dpb.set_equations(equations_stokes) dpb.time_update(None) output('solving Stokes problem...') state = dpb.solve(nls_conf=nls_conf) output('...done') return state
[docs]def solve_navier_stokes(conf, options): opts = conf.options dpb = Problem.from_conf(conf, init_equations=False) equations = getattr(conf, '_'.join(('equations_direct', opts.problem))) dpb.set_equations(equations) ls_conf = dpb.get_solver_conf( opts.ls ) nls_conf = dpb.get_solver_conf(opts.nls_direct) method = opts.direct_method if method == 'stationary': data = {} dpb.time_update(None) state_dp = dpb.solve(nls_conf=nls_conf) elif method == 'transient': ls = Solver.any_from_conf( ls_conf ) ts_conf = dpb.get_solver_conf( opts.ts_direct ) data = {'ts' : Struct( dt = ts_conf.dt )} # Plug in mass term. mequations = {} for key, eq in equations.iteritems(): if 'dw_div_grad' in eq: eq = '+'.join( (ts_conf.mass_term, eq) ).replace( '++', '+') mequations[key] = eq if ts_conf.stokes_init: state_dp0 = solve_stokes( dpb, conf.equations_direct_stokes, nls_conf ) dpb.set_equations( mequations ) else: dpb.set_equations( mequations ) state_dp0 = dpb.create_state() dpb.time_update( None ) state_dp0.apply_ebc() from sfepy.base.log import Log log = Log.from_conf( Struct( is_plot = True ), ([r'$||u||$'], [r'$||p||$']) ) output( 'Navier-Stokes...' ) ev = BasicEvaluator( dpb, ts = Struct( dt = ts_conf.dt ) ) nls = Solver.any_from_conf( nls_conf, evaluator = ev, lin_solver = ls ) n_step = ts_conf.n_step step = 0 while 1: for ii in xrange( n_step ): output( step ) vec_u = state_dp0('w') vec_p = state_dp0('r') log( nm.linalg.norm( vec_u ), nm.linalg.norm( vec_p ) ) dpb.variables.set_data_from_state('w_0', state_dp0(), 'w') vec_dp = nls( state_dp0() ) step += 1 state_dp = state_dp0.copy() state_dp.set_reduced(vec_dp) state_dp0 = state_dp if ts_conf.interactive: try: n_step = int( raw_input( 'continue: ' ) ) if n_step <= 0: break except: break vec_u = state_dp('w') vec_p = state_dp('r') log( nm.linalg.norm( vec_u ), nm.linalg.norm( vec_p ), finished = True ) else: raise 'unknown Navier-Stokes solution method (%s)!' % method return dpb, state_dp, data
[docs]def solve_generic_direct(conf, options): opts = conf.options dpb = Problem.from_conf(conf, init_equations=False) equations = getattr(conf, '_'.join(('equations_direct', opts.problem))) dpb.set_equations(equations) dpb.time_update(None) nls_conf = dpb.get_solver_conf(opts.nls_direct) state_dp = dpb.solve(nls_conf=nls_conf) return dpb, state_dp, {} ## # c: 22.11.2006, r: 15.04.2008
[docs]def solve_direct( conf, options ): """ Solve the direct (nonlinear) problem. """ opts = conf.options if hasattr( opts, 'problem' ): if opts.problem == 'navier_stokes': dpb, state_dp, data = solve_navier_stokes( conf, options ) else: output( 'unknown problem type (%s), using generic solver.'\ % opts.problem ) dpb, state_dp, data = solve_generic_direct( conf, options ) else: # Generic direct problem. dpb, state_dp, data = solve_generic_direct( conf, options ) trunk = io.get_trunk( conf.filename_mesh ) dpb.save_state( trunk + '_direct.vtk', state_dp ) if options.dump_filename is not None: import tables as pt import numarray as nar fd = pt.openFile( options.dump_filename, mode = 'w', title = "Dump file" ) out = state_dp.create_output_dict() for key, val in out.iteritems(): fd.createArray( fd.root, key, nar.asarray( val.data ), '%s data' % val.mode ) fd.close() if options.pert_mesh_filename is not None: coors0 = dpb.get_mesh_coors() # !!! # 'u' is here for displacements of le.py! vec_u = state_dp('u').copy() vec_u = vec_u.reshape( coors0.shape ) coors = coors0 + vec_u dpb.set_mesh_coors( coors ) dpb.domain.mesh.write( options.pert_mesh_filename, io = 'auto' ) return dpb, state_dp, data
[docs]def solve_adjoint(conf, options, dpb, state_dp, data): """ Solve the adjoint (linear) problem. """ opts = conf.options if dpb: apb = dpb.copy('adjoint') else: apb = Problem.from_conf(conf, init_equations=False) equations = getattr(conf, '_'.join(('equations_adjoint', opts.problem, opts.objective_function))) apb.set_equations(equations) apb.time_update(None) apb.ebcs.zero_dofs() apb.update_equations(None, ebcs=apb.ebcs) var_data = state_dp.get_parts() var_data = remap_dict(var_data, opts.var_map) nls_conf = apb.get_solver_conf(opts.nls_adjoint) state_ap = apb.solve(nls_conf=nls_conf, var_data=var_data) trunk = io.get_trunk(conf.filename_mesh) apb.save_state(trunk + '_adjoint.vtk', state_ap) shape_opt = so.ShapeOptimFlowCase.from_conf(conf, dpb, apb) if options.test is not None: ## # Test shape sensitivity. if shape_opt.test_terms_if_test: so.test_terms([options.test], opts.term_delta, shape_opt, var_data, state_ap) shape_opt.check_sensitivity([options.test], opts.delta, var_data, state_ap) ## # Compute objective function. val = shape_opt.obj_fun(state_dp) print 'actual obj_fun:', val ## # Compute shape sensitivity. vec_sa = shape_opt.sensitivity(var_data, state_ap) print 'actual sensitivity:', vec_sa ## # c: 22.11.2006, r: 15.04.2008
[docs]def solve_optimize( conf, options ): opts = conf.options trunk = io.get_trunk( conf.filename_mesh ) data = {} dpb = Problem.from_conf(conf, init_equations=False) equations = getattr( conf, '_'.join( ('equations_direct', opts.problem) ) ) dpb.set_equations( equations ) dpb.name = 'direct' dpb.time_update(None) apb = dpb.copy('adjoint') equations = getattr( conf, '_'.join( ('equations_adjoint', opts.problem, opts.objective_function) ) ) apb.set_equations( equations ) apb.time_update(None) apb.ebcs.zero_dofs() apb.update_equations(None, ebcs=apb.ebcs) ls_conf = dpb.get_solver_conf(opts.ls) dnls_conf = dpb.get_solver_conf(opts.nls_direct) anls_conf = dpb.get_solver_conf(opts.nls_adjoint) opt_conf = dpb.get_solver_conf(opts.optimizer) dpb.init_solvers(ls_conf=ls_conf, nls_conf=dnls_conf) apb.init_solvers(ls_conf=ls_conf, nls_conf=anls_conf) shape_opt = so.ShapeOptimFlowCase.from_conf(conf, dpb, apb) design0 = shape_opt.dsg_vars.val shape_opt.cache = Struct(design=design0 + 100, state=None, i_mesh=-1) opt_status = IndexedStruct() optimizer = Solver.any_from_conf(opt_conf, obj_fun=so.obj_fun, obj_fun_grad=so.obj_fun_grad, status=opt_status, obj_args=(shape_opt, opts)) ## # State problem solution for the initial design. vec_dp0 = so.solve_problem_for_design(dpb, design0, shape_opt, opts) dpb.save_state( trunk + '_direct_initial.vtk', vec_dp0 ) ## # Optimize. des = optimizer( design0 ) print opt_status ## # Save final state (for "optimal" design). dpb.domain.mesh.write( trunk + '_opt.mesh', io = 'auto' ) dpb.save_state(trunk + '_direct_current.vtk', shape_opt.cache.state) print des
usage = """%prog [options] filename_in""" help = { 'server_mode' : "run in server mode [default: %default], N/A", 'adjoint' : "solve adjoint problem [default: %default]", 'direct' : "solve direct problem [default: %default]", 'test' : "test sensitivity by finite difference," " using design variable idsg; switches on -a, -d", 'dump' : "dump direct problem state to filename", 'pert': "save displacement-perturbed mesh to filename", 'optimize' : "full shape optimization problem", } ## # created: 13.06.2005 # last revision: 15.04.2008
[docs]def main(): parser = OptionParser(usage = usage, version = "%prog " + sfepy.__version__) parser.add_option( "-s", "--server", action = "store_true", dest = "server_mode", default = False, help = help['server_mode'] ) parser.add_option( "-a", "--adjoint", action = "store_true", dest = "adjoint", default = False, help = help['adjoint'] ) parser.add_option( "-d", "--direct", action = "store_true", dest = "direct", default = False, help = help['direct'] ) parser.add_option( "-t", "--test", type = int, metavar = 'idsg', action = "store", dest = "test", default = None, help = help['test'] ) parser.add_option( "", "--dump", metavar = 'filename', action = "store", dest = "dump_filename", default = None, help = help['dump'] ) parser.add_option( "", "--pert-mesh", metavar = 'filename', action = "store", dest = "pert_mesh_filename", default = None, help = help['pert'] ) parser.add_option( "-f", "--full", action = "store_true", dest = "optimize", default = False, help = help['optimize'] ) options, args = parser.parse_args() if options.test is not None: options.adjoint = options.direct = True if options.optimize: options.adjoint = options.direct = False if ((len( args ) == 1) and (options.direct or options.adjoint or options.optimize)): filename_in = args[0]; else: parser.print_help(), return required, other = get_standard_keywords() required.remove('equations') if options.adjoint: required += ['equations_adjoint_.*', 'filename_vp', 'equations_direct_.*'] options.direct = True elif options.direct: required += ['equations_direct_.*'] elif options.optimize: required += ['equations_direct_.*', 'equations_adjoint_.*', 'equations_sensitivity_.*', 'filename_vp'] conf = ProblemConf.from_file( filename_in, required, other ) if options.direct: dpb, state_dp, data = solve_direct( conf, options ) else: dpb, state_dp, data = None, None, None if options.adjoint: solve_adjoint( conf, options, dpb, state_dp, data ) if options.optimize: solve_optimize( conf, options )
if __name__ == '__main__': main()