From 90f3dfdcd3d7243db73ad1491264b8886d80e1e6 Mon Sep 17 00:00:00 2001 From: JuLa96 <206707905+JuLa96@users.noreply.github.com> Date: Thu, 12 Feb 2026 09:48:29 +0100 Subject: [PATCH 1/5] functions to generate generic debris-flow topography --- debrisframe/in1Utils/generateTopo.py | 807 ++++++++++++++++++++++ debrisframe/in1Utils/generateTopoCfg.ini | 139 ++++ debrisframe/out1Plot/outTopo.py | 148 ++++ debrisframe/runScripts/runGenerateTopo.py | 33 + 4 files changed, 1127 insertions(+) create mode 100644 debrisframe/in1Utils/generateTopo.py create mode 100644 debrisframe/in1Utils/generateTopoCfg.ini create mode 100644 debrisframe/out1Plot/outTopo.py create mode 100644 debrisframe/runScripts/runGenerateTopo.py diff --git a/debrisframe/in1Utils/generateTopo.py b/debrisframe/in1Utils/generateTopo.py new file mode 100644 index 0000000..f90ebd1 --- /dev/null +++ b/debrisframe/in1Utils/generateTopo.py @@ -0,0 +1,807 @@ +""" + Create generic/idealised topographies +""" + +# load modules +import logging +import numpy as np +import math +from scipy.stats import norm +from scipy.interpolate import griddata +import pathlib +from rasterio.crs import CRS + +from avaframe.in3Utils import geoTrans +import avaframe.in2Trans.rasterUtils as IOf + +# create local logger +# change log level in calling module to DEBUG to see log messages +log = logging.getLogger(__name__) + + +def getParabolaParams(cfg): + """Compute parameters for parabola""" + + # input parameters + C = float(cfg["TOPO"]["C"]) + fLens = float(cfg["TOPO"]["fLens"]) + meanAlpha = float(cfg["TOPO"]["meanAlpha"]) + + # If mean slope is given or distance to the start of the flat plane + if meanAlpha != 0: + fLen = C / np.tan(np.radians(meanAlpha)) + log.info("fLen computed from mean alpha: %.2f meters" % fLen) + else: + fLen = fLens + log.info("flen directly set to: %.2f meters" % fLen) + A = C / (fLen**2) + B = (-C * 2.0) / fLen + + return A, B, fLen + + +def getGridDefs(cfg): + # determine number of rows and columns to define domain + dx = float(cfg["TOPO"]["dx"]) + xEnd = float(cfg["TOPO"]["xEnd"]) + yEnd = float(cfg["TOPO"]["yEnd"]) + + return dx, xEnd, yEnd + + +def computeCoordGrid(dx, xEnd, yEnd): + + # Compute coordinate grid + xv = np.arange(0, xEnd + dx, dx) + yv = np.arange(-0.5 * yEnd, 0.5 * (yEnd + dx), dx) + nRows = len(yv) + nCols = len(xv) + x, y = np.meshgrid(xv, yv) + zv = np.zeros((nRows, nCols)) + + return xv, yv, zv, x, y, nRows, nCols + + +def flatplane(cfg): + """Compute coordinates of flat plane topography""" + + dx, xEnd, yEnd = getGridDefs(cfg) + + zElev = float(cfg["TOPO"]["zElev"]) + + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # Set elevation of surface + zv = zv + zElev + + # Log info here + log.info("Flatplane coordinates computed") + + return x, y, zv + + +def inclinedplane(cfg): + """Compute coordinates of inclined plane with given slope (meanAlpha)""" + + # input parameters + dx, xEnd, yEnd = getGridDefs(cfg) + + z0 = float(cfg["TOPO"]["z0"]) + meanAlpha = float(cfg["TOPO"]["meanAlpha"]) + + cFf = float(cfg["CHANNELS"]["cff"]) + cRadius = float(cfg["CHANNELS"]["cRadius"]) + + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # Set surface elevation from slope and max. elevation + zv = z0 - np.tan(np.radians(meanAlpha)) * x + + # If a channel shall be introduced + if cfg["TOPO"].getboolean("channel"): + # Compute cumulative distribution function and set horizontal extent of channel + c0 = norm.cdf(xv, 0, cFf) + cExtent = cRadius + yv = np.reshape(yv, (nRows, 1)) + + # if location within horizontal extent of channel, + # make half sphere shaped channel with radius given by channel horizontal extent + mask = np.zeros(np.shape(yv)) + mask[np.where(abs(yv) < cExtent)] = 1 + if cfg["TOPO"].getboolean("topoAdd"): + zv = zv + cExtent * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(yv) / (cExtent ** 2))))) * mask + mask = np.zeros(np.shape(yv)) + mask[np.where(abs(yv) >= cExtent)] = 1 + zv = zv + cExtent * c0 * mask + else: + zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(yv) / (cExtent ** 2)))) * mask + + # Log info here + log.info("Inclined plane coordinates computed") + + return x, y, zv + + +def addDrop(cfg, x, y, zv): + """Add a drop to a given topography + + The drop is added in the x drection + + Parameters + ---------- + cfg: configparser + configuration for generateTopo + x: 2D numpy array + x coordinate of the raster + y: 2D numpy array + y coordinate of the raster + zv: 2D numpy array + z coordinate of the raster + + Returns + ------- + zv: 2D numpy array + z coordinate of the raster taking the drop into account + """ + + # input parameters + dx, xEnd, yEnd = getGridDefs(cfg) + + xStartDrop = float(cfg["DROP"]["xStartDrop"]) + dxDrop = float(cfg["DROP"]["dxDrop"]) + alphaDrop = float(cfg["DROP"]["alphaDrop"]) + + # get zcoord + # deduce drop height from the drop steepness and length in x direction + dzDrop = dxDrop * np.tan(np.radians(alphaDrop)) + xEndDrop = xStartDrop + dxDrop + + nrows, ncols = np.shape(x) + # get the z coordinate of the point at the beginning of the drop + zIniDrop, _ = geoTrans.projectOnGrid( + xStartDrop * np.ones((nrows)), + y[:, 0], + np.vstack((zv[0, :], zv)), + csz=dx, + xllc=x[0, 0], + yllc=y[0, 0], + interp="bilinear", + ) + zIniDrop = np.tile(zIniDrop, (ncols, 1)).transpose() + # get the z coordinate of the point at the end of the drop + zEndDrop, _ = geoTrans.projectOnGrid( + xEndDrop * np.ones((nrows)), + y[:, 0], + np.vstack((zv[0, :], zv)), + csz=dx, + xllc=x[0, 0], + yllc=y[0, 0], + interp="bilinear", + ) + zEndDrop = np.tile(zEndDrop, (ncols, 1)).transpose() + # Set surface elevation from slope and max. elevation + zv = np.where( + ((x >= xStartDrop) & (x <= xEndDrop)), + zIniDrop - (x - xStartDrop) * np.tan(np.radians(alphaDrop)), + zv, + ) + zv = np.where(x > xEndDrop, zv - (dzDrop + zEndDrop - zIniDrop), zv) + + # Log info here + log.info("Added drop to the topography") + + return zv + + +def hockey(cfg): + """ + Compute coordinates of an inclined plane with a flat foreland defined by + total fall height z0, angle to flat foreland (meanAlpha) and a radius (rCirc) to + smooth the transition from inclined plane to flat foreland + """ + + # input parameters + rCirc = float(cfg["TOPO"]["rCirc"]) + meanAlpha = float(cfg["TOPO"]["meanAlpha"]) + z0 = float(cfg["TOPO"]["z0"]) + + cff = float(cfg["CHANNELS"]["cff"]) + cRadius = float(cfg["CHANNELS"]["cRadius"]) + cInit = float(cfg["CHANNELS"]["cInit"]) + cMustart = float(cfg["CHANNELS"]["cMustart"]) + cMuendFP = float(cfg["CHANNELS"]["cMuendFP"]) + + dx, xEnd, yEnd = getGridDefs(cfg) + + # Compute coordinate grid + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # Compute distance to flat foreland for given meanAlpha + x1 = z0 / np.tan(np.radians(meanAlpha)) + if x1 >= xEnd * 0.9: + log.warning( + "Your domain (xEnd) is to small or the slope angle (meanAlpha) to" + "shallow to produce a signifcant (>10 percent of domain, in your case:" + " %.2f m) flat foreland!" % (0.1 * (xEnd - dx)) + ) + + # Compute circle parameters for smoothing the transition + beta = 0.5 * (180.0 - (meanAlpha)) + xc = rCirc / np.tan(np.radians(beta)) + yc = xc * np.cos(np.radians(meanAlpha)) + xCirc = x1 + xc + # for plotting + d1 = np.tan(np.radians(beta)) * x1 + + # Set surface elevation + zv = np.zeros((nRows, nCols)) + mask = np.zeros(np.shape(x)) + mask[np.where(x < (x1 - yc))] = 1 + zv = zv + (z0 - np.tan(np.radians(meanAlpha)) * x) * mask + + mask = np.zeros(np.shape(x)) + mask[np.where(((x1 - yc) <= x) & (x <= (x1 + xc)))] = 1 + # rCirc + np.sqrt(rCirc**2 - (xv[m] - xCirc)**2) + zv = zv + (rCirc - np.sqrt(np.abs(rCirc ** 2 - (xCirc - x) ** 2))) * mask + + # If a channel shall be introduced + if cfg["TOPO"].getboolean("channel"): + # Compute cumulative distribution function - c1 for upper part (start) + # of channel and c2 for lower part (end) of channel + c1 = norm.cdf(xv, cMustart * (x1), cff) + c2 = 1.0 - norm.cdf(xv, cMuendFP * (x1), cff) + + # combine both into one function separated at the the middle of + # the channel longprofile location + mask = np.zeros(np.shape(xv)) + mask[np.where(xv < (x1 * (0.5 * (cMustart + cMuendFP))))] = 1 + c0 = c1 * mask + + mask = np.zeros(np.shape(xv)) + mask[np.where(xv >= (x1 * (0.5 * (cMustart + cMuendFP))))] = 1 + c0 = c0 + c2 * mask + + # Is the channel of constant width or narrowing + if cfg["TOPO"].getboolean("narrowing"): + cExtent = cInit * (1 - c0[:]) + (c0[:] * cRadius) + else: + cExtent = np.zeros(np.shape(xv)) + cRadius + + # Set surface elevation + mask = np.zeros(np.shape(y)) + mask[np.where(abs(y) < cExtent)] = 1 + # Add surface elevation modification introduced by channel + if cfg["TOPO"].getboolean("topoAdd"): + zv = zv + cExtent * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2))))) * mask + # outside of the channel, add layer of channel thickness + mask = np.zeros(np.shape(y)) + mask[np.where(abs(y) >= cExtent)] = 1 + zv = zv + cExtent * c0 * mask + else: + zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2)))) * mask + + # Log info here + log.info("Hockeystick coordinates computed") + + return x, y, zv + +def debrisFlowTopoAverage(cfg): + """ + Compute coordinates of an average parabolic-shaped slope as a generic topography for debris-flow simulations + defined by a 2nd-degree polynomial: ax**2 + bx + c + The parameters of the polynomial function are derived from real watershed profiles (Kessler 2019) + """ + # input parameters + C = float(cfg["TOPO"]["C"]) # C = 709 m + cff = float(cfg["CHANNELS"]["cff"]) + cRadius = float(cfg["CHANNELS"]["cRadius"]) + cInit = float(cfg["CHANNELS"]["cInit"]) + cMustart = float(cfg["CHANNELS"]["cMustart"]) + cMuend = float(cfg["CHANNELS"]["cMuend"]) + + # Get grid definitons + dx, xEnd, yEnd = getGridDefs(cfg) + + # Compute coordinate grid + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # If a channel shall be introduced + # Get parabola Parameters + [A, B, fLen] = getParabolaParams(cfg) # fLen = x0 = 1679 m + + # Set surface elevation + mask = np.zeros(np.shape(xv)) + mask[np.where(xv < fLen)] = 1 + zv = (A * xv ** 2 + B * xv + C) * mask + + # If a channel shall be introduced + if cfg["TOPO"].getboolean("channel"): + # Compute cumulative distribution function - c1 for upper part (start) + # of channel and c2 for lower part (end) of channel + c1 = norm.cdf(xv, cMustart * fLen, cff) + c2 = 1.0 - norm.cdf(xv, cMuend * fLen, cff) # cMuend = 0.62, cff = 120 + + # combine both into one function separated at the the middle of + # the channel longprofile location + mask_c1 = np.zeros(np.shape(xv)) + mask_c1[np.where(xv < (fLen * (0.5 * (cMustart + cMuend))))] = 1 + c0 = c1 * mask_c1 + + mask_c2 = np.zeros(np.shape(xv)) + mask_c2[np.where(xv >= (fLen * (0.5 * (cMustart + cMuend))))] = 1 + c0 = c0 + c2 * mask_c2 + + # Is the channel of constant width or narrowing + if cfg["TOPO"].getboolean("narrowing"): + # upper part of channel: constant width + mask_c1 = np.zeros(np.shape(xv)) + mask_c1[np.where(xv < (fLen * (0.5 * (cMustart + cMuend))))] = 1 + cExtent_c1 = np.zeros(np.shape(xv)) + cRadius + + # lower part of channel: narrowing + mask_c2 = np.zeros(np.shape(xv)) + mask_c2[np.where(xv >= (fLen * (0.5 * (cMustart + cMuend))))] = 1 + cExtent_c2 = cInit * (1 - c0[:]) + (c0[:] * cRadius) + + cExtent = cExtent_c1 * mask_c1 + cExtent_c2 * mask_c2 + else: + cExtent = np.zeros(np.shape(xv)) + cRadius + + # Set surface elevation + mask = np.zeros(np.shape(y)) + mask[np.where(abs(y) < cExtent)] = 1 + # Add surface elevation modification introduced by channel + if cfg["TOPO"].getboolean("topoAdd"): + zv = zv + cRadius * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2))))) * mask # changed from cExtent to cRadius + # outside of the channel, add layer of channel thickness + mask = np.zeros(np.shape(y)) + mask[np.where(abs(y) >= cExtent)] = 1 + mask_c2 = np.ones(np.shape(xv)) #added, smooth transition from upper to lower part of channel + c0 = c2 * mask_c2 #added to extend lower distribution to upper edge of topography + zv = zv + cRadius * mask * c0 # changed from cExtent to cRadius + else: + zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2)))) * mask + + # Log info here + log.info("Generic debris-flow topography is computed") + + return x, y, zv + +def parabola(cfg): + """ + Compute coordinates of a parabolically-shaped slope with a flat foreland + defined by total fall height C, angle (meanAlpha) or distance (fLen) to flat foreland + """ + + C = float(cfg["TOPO"]["C"]) + cff = float(cfg["CHANNELS"]["cff"]) + cRadius = float(cfg["CHANNELS"]["cRadius"]) + cInit = float(cfg["CHANNELS"]["cInit"]) + cMustart = float(cfg["CHANNELS"]["cMustart"]) + cMuend = float(cfg["CHANNELS"]["cMuend"]) + + # Get grid definitons + dx, xEnd, yEnd = getGridDefs(cfg) + + # Compute coordinate grid + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # Get parabola Parameters + [A, B, fLen] = getParabolaParams(cfg) + + # Set surface elevation + zv = np.ones((nRows, nCols)) + # initialize superimposed channel + superChannel = np.zeros(np.shape(xv)) + superDam = np.zeros(np.shape(xv)) + + zv = zv * ((-(B ** 2)) / (4.0 * A) + C) + mask = np.zeros(np.shape(xv)) + mask[np.where(xv < fLen)] = 1 + + zv = zv + (A * xv ** 2 + B * xv + C) * mask + + # If a channel shall be introduced + if cfg["TOPO"].getboolean("channel"): + c1 = norm.cdf(xv, cMustart * fLen, cff) + c2 = 1.0 - norm.cdf(xv, cMuend * fLen, cff) + + # combine both into one function separated at the the middle of + # the channel longprofile location + mask = np.zeros(np.shape(xv)) + mask[np.where(xv < (fLen * (0.5 * (cMustart + cMuend))))] = 1 + c0 = c1 * mask + + mask = np.zeros(np.shape(xv)) + mask[np.where(xv >= (fLen * (0.5 * (cMustart + cMuend))))] = 1 + c0 = c0 + c2 * mask + + # Is the channel of constant width or narrowing + if cfg["TOPO"].getboolean("narrowing"): + cExtent = cInit * (1 - c0[:]) + (c0[:] * cRadius) + else: + cExtent = np.zeros(nCols) + cRadius + + # Add surface elevation modification introduced by channel + mask = np.zeros(np.shape(y)) + mask[np.where(abs(y) < cExtent)] = 1 + if cfg["TOPO"].getboolean("topoAdd"): + superChannel = ( + superChannel + + cExtent * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2))))) * mask + ) + # outside of the channel, add layer of channel thickness + mask = np.zeros(np.shape(y)) + mask[np.where(abs(y) >= cExtent)] = 1 + superChannel = superChannel + cExtent * c0 * mask + else: + superChannel = ( + superChannel - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2)))) * mask + ) + + if cfg["TOPO"].getboolean("dam"): + damPos = cfg["DAMS"].getfloat("damPos") + damHeight = cfg["DAMS"].getfloat("damHeight") + damWidth = cfg["DAMS"].getfloat("damWidth") + superDam = norm.pdf(xv, damPos * (-B / 2 / A), damWidth) + superDam = superDam / np.max(superDam) * damHeight + if not cfg["TOPO"].getboolean("topoAdd"): + superDam = superDam - cExtent * c0 + + # add channel and dam + zv = zv + np.maximum(superDam, superChannel) + + # Log info here + log.info("Parabola with flat outrun coordinates computed") + + return x, y, zv + + +def parabolaRotation(cfg): + """ + Compute coordinates of a parabolically-shaped slope with a flat foreland + defined by total fall height C, angle (meanAlpha) or distance (fLen) to flat foreland + One parabolic slope in x direction, one sloped with 45° and one with 60° + """ + + C = float(cfg["TOPO"]["C"]) + fFlat = float(cfg["TOPO"]["fFlat"]) + + # Get grid definitons + dx, xEnd, yEnd = getGridDefs(cfg) + + # Compute coordinate grid, with center in (0, 0) + xv = np.arange(-0.5 * xEnd, 0.5 * (xEnd + dx), dx) + yv = np.arange(-0.5 * yEnd, 0.5 * (yEnd + dx), dx) + nRows = len(yv) + nCols = len(xv) + xv, yv = np.meshgrid(xv, yv) + zv = np.zeros((nRows, nCols)) + + # Get parabola Parameters + [A, B, fLen] = getParabolaParams(cfg) + + # Set surface elevation + zv = np.ones((nRows, nCols)) + zv = zv * ((-(B ** 2)) / (4.0 * A) + C) + # compute polar coordinates + r = np.sqrt(xv**2 + yv**2) + theta = np.arctan2(-yv, xv) + + # add parabola aligned with x (going from left to right) + phi = math.pi + # rotation of the polar coord system to be aligned with the parabola direction + s = createParabolaAxis(phi, theta, r, zv, fLen, fFlat) + + mask = np.ones(np.shape(s)) + mask[np.where(theta < 2 * math.pi / 3)] = 0 + mask[np.where(theta <= -5 * math.pi / 8)] = 1 + mask[np.where(s > fLen)] = 0 + zv = zv + (A * s ** 2 + B * s + C) * mask + + # add parabola sloped 60° with x + phi = math.pi / 3 + # rotation of the polar coord system to be aligned with the parabola direction + s = createParabolaAxis(phi, theta, r, zv, fLen, fFlat) + + mask = np.ones(np.shape(s)) + mask[np.where(theta > 2 * math.pi / 3)] = 0 + mask[np.where(theta < math.pi / 24)] = 0 + mask[np.where(s > fLen)] = 0 + zv = zv + (A * s ** 2 + B * s + C) * mask + + # add parabola aligned with x (going from left to right) + phi = -math.pi / 4 + # rotation of the polar coord system to be aligned with the parabola direction + s = createParabolaAxis(phi, theta, r, zv, fLen, fFlat) + + # apply the parabola to the corresponding part of the dem + mask = np.ones(np.shape(s)) + mask[np.where(theta > math.pi / 24)] = 0 + mask[np.where(theta < -5 * math.pi / 8)] = 0 + mask[np.where(s > fLen)] = 0 + zv = zv + (A * s ** 2 + B * s + C) * mask + + # Log info here + log.info("Triple parabola with flat foreland coordinates computed") + + return xv, yv, zv + + +def createParabolaAxis(phi, theta, r, zv, fLen, fFlat): + """create the s coordinate for a lined sloped from theta - phi from x axis""" + # rotation of the polar coord system to be aligned with the parabola direction + gamma = theta - phi + gamma = np.where(gamma < 0, gamma + 2 * math.pi, gamma) + gamma = np.where(gamma >= 2 * math.pi, gamma - 2 * math.pi, gamma) + # compute the s in the cartesian coord system aligned with the parabola + s = r * np.cos(gamma) + # shift this so that origin is at the top of the parabola + s = -s + fLen + fFlat + # apply the parabola to the corresponding part of the dem + return s + + +def bowl(cfg): + """Compute coordinates of sphere with given radius (rBwol)""" + + # input parameters + rBwol = float(cfg["TOPO"]["rBowl"]) + + # Get grid definitions + dx, xEnd, yEnd = getGridDefs(cfg) + + # Compute coordinate grid + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # recompute xv yv and x, y as they are shifted + xv = np.arange(-0.5 * xEnd, 0.5 * (xEnd + dx), dx) + yv = np.arange(-0.5 * yEnd, 0.5 * (yEnd + dx), dx) + x, y = np.meshgrid(xv, yv) + + # Set surface elevation + zv = rBwol * np.ones((nRows, nCols)) + if cfg["TOPO"].getboolean("curvedSlope"): + radius = np.sqrt(x**2) + else: + radius = np.sqrt(x**2 + y**2) + mask = np.zeros(np.shape(x)) + mask[np.where(radius <= rBwol)] = 1 + zv = zv - (rBwol * np.sqrt(np.abs(1 - (radius / rBwol) ** 2))) * mask + if cfg["TOPO"].getboolean("curvedSlope"): + zv[x >= 0] = 0.0 + + # Log info here + log.info("Bowl coordinates computed") + + return x, y, zv + + +def helix(cfg): + """Compute coordinates of helix-shaped topography with given radius (rHelix)""" + + # input parameters + rHelix = float(cfg["TOPO"]["rHelix"]) + C = float(cfg["TOPO"]["C"]) + cff = float(cfg["CHANNELS"]["cff"]) + cRadius = float(cfg["CHANNELS"]["cRadius"]) + cInit = float(cfg["CHANNELS"]["cInit"]) + cMustart = float(cfg["CHANNELS"]["cMustart"]) + cMuend = float(cfg["CHANNELS"]["cMuend"]) + + # Get grid definitions + dx, xEnd, yEnd = getGridDefs(cfg) + + # Compute coordinate grid + xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + + # recompute xv yv and x, y as they are shifted + xv = np.arange(-0.5 * xEnd, 0.5 * (xEnd + dx), dx) + yv = np.arange(-yEnd, 0 + dx, dx) + x, y = np.meshgrid(xv, yv) + + # Get parabola Parameters + [A, B, fLen] = getParabolaParams(cfg) + + # Set surface elevation + zv = np.ones((nRows, nCols)) + radius = np.sqrt(x**2 + y**2) + theta = np.arctan2(y, x) + np.pi + zv = zv * ((-(B ** 2)) / (4.0 * A) + C) + mask = np.zeros(np.shape(x)) + mask[np.where((theta * rHelix) < fLen)] = 1 + + zv = zv + (A * (theta * rHelix) ** 2 + B * (theta * rHelix) + C) * mask + + # If channel is introduced to topography + if cfg["TOPO"].getboolean("channel"): + c0 = np.zeros(np.shape(x)) + mask = np.zeros(np.shape(x)) + mask[np.where((theta * rHelix) < (0.5 * (cMustart + cMuend) * fLen))] = 1 + c0 = c0 + norm.cdf(theta * rHelix, cMustart * fLen, cff) * mask + mask = np.zeros(np.shape(x)) + mask[np.where((theta * rHelix) >= (0.5 * (cMustart + cMuend) * fLen))] = 1 + c0 = c0 + (1.0 - norm.cdf(theta * rHelix, cMuend * fLen, cff)) * mask + # c0 = np.ones(np.shape(zv)) + # If channel of constant width or becoming narrower in the middle + if cfg["TOPO"].getboolean("narrowing"): + cExtent = cInit * (1.0 - c0) + c0 * cRadius + else: + cExtent = cRadius + + if cfg["TOPO"].getboolean("topoAdd"): + zv = zv + c0 * cExtent + + # Inner and outer boundaries of the channel + boundIn = rHelix - cExtent + boundExt = rHelix + cExtent + + # Set channel + mask = np.zeros(np.shape(x)) + mask[np.where((radius >= rHelix) & (radius < boundExt))] = 1 + radius1 = radius - rHelix + zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(radius1) / np.square(cExtent)))) * mask + + mask = np.zeros(np.shape(x)) + mask[np.where((radius < rHelix) & (radius > boundIn))] = 1 + radius2 = rHelix - radius + zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(radius1) / np.square(cExtent)))) * mask + + # set last row at Center to fall height + indCols = int(0.5 * nCols) + zv[-1, 0:indCols] = C + + # Log info here + log.info("Helix coordinates computed") + + return x, y, zv + + +def pyramid(cfg): + """Generate a pyramid topography - in this case rectangular domain""" + + # get parameters from ini + meanAlpha = float(cfg["TOPO"]["meanAlpha"]) + z0 = float(cfg["TOPO"]["z0"]) + flatx = float(cfg["TOPO"]["flatx"]) + flaty = float(cfg["TOPO"]["flaty"]) + phi = float(cfg["TOPO"]["phi"]) + dx = float(cfg["TOPO"]["dx"]) + + # initialise pyramid corners and center point + points = np.asarray([[-1.0, -1.0, 0], [-1.0, 1.0, 0], [1.0, 1.0, 0], [1.0, -1, 0.0], [0.0, 0.0, 1.0]]) + dxPoints = abs(points[4, 0] - points[1, 0]) + + # compute elevation of the apex point for given angle of pyramid facets + zAlpha = dxPoints * np.tan(np.deg2rad(meanAlpha)) + points[4, 2] = zAlpha + dcoors = points * z0 / zAlpha + + # if desired rotate pyramid + if cfg["TOPO"].getboolean("flagRot"): + dcoorsRot = np.zeros((len(dcoors), 3)) + for m in range(len(dcoorsRot)): + dcoorsRot[m, 0] = np.cos(np.deg2rad(phi)) * dcoors[m, 0] - np.sin(np.deg2rad(phi)) * dcoors[m, 1] + dcoorsRot[m, 1] = np.sin(np.deg2rad(phi)) * dcoors[m, 0] + np.cos(np.deg2rad(phi)) * dcoors[m, 1] + dcoorsRot[m, 2] = dcoors[m, 2] + dcoorsFin = dcoorsRot + else: + dcoorsFin = dcoors + + # split into horizontal and vertical coordinate points + xyPoints = np.zeros((len(points), 2)) + xyPoints[:, 0] = dcoorsFin[:, 0] + xyPoints[:, 1] = dcoorsFin[:, 1] + zPoints = dcoorsFin[:, 2] + + # make meshgrid for final DEM + xv = np.arange(-flatx + np.amin(dcoorsFin[:, 0]), np.amax(dcoorsFin[:, 0]) + flatx, dx) + yv = np.arange(-flaty + np.amin(dcoorsFin[:, 1]), np.amax(dcoorsFin[:, 1]) + flaty, dx) + x, y = np.meshgrid(xv, yv) + + # interpolate appex point information to meshgrid + z = griddata(xyPoints, zPoints, (x, y), method="linear") + zNan = np.isnan(z) + z[zNan] = 0.0 + + dX = np.amax(dcoorsFin[:, 0]) + flatx - (-flatx + np.amin(dcoorsFin[:, 0])) + dY = np.amax(dcoorsFin[:, 1]) + flaty - (-flaty + np.amin(dcoorsFin[:, 1])) + log.info("domain extent pyramid- inx: %f, in y: %f" % (dX, dY)) + + return x, y, z + + +def writeDEM(cfg, z, outDir): + """Write topography information to file""" + nameExt = cfg["TOPO"]["demType"] + nRows = z.shape[0] + nCols = z.shape[1] + + # Read lower left center coordinates, cellsize and noDATA value + xllcenter = float(cfg["DEMDATA"]["xl"]) + yllcenter = float(cfg["DEMDATA"]["yl"]) + cellsize = float(cfg["TOPO"]["dx"]) + noDATA = float(cfg["DEMDATA"]["nodata_value"]) + demName = cfg["DEMDATA"]["demName"] + + # Save elevation data to file and add header lines + demFile = outDir / ("%s_%s_Topo" % (demName, nameExt)) + demHeader = { + "ncols": nCols, + "nrows": nRows, + "xllcenter": xllcenter, + "yllcenter": yllcenter, + "cellsize": cellsize, + "nodata_value": noDATA, + } + + transform = IOf.transformFromASCHeader(demHeader) + demHeader["transform"] = transform + demHeader["driver"] = "AAIGrid" + demHeader["crs"] = CRS() + + IOf.writeResultToRaster(demHeader, z, demFile, flip=False) + + # Log info here + log.info("DEM written to: %s/%s_%s_Topo" % (outDir, demName, nameExt)) + + +def generateTopo(cfg, avalancheDir): + """Compute coordinates of desired topography with given inputs""" + + # Which DEM type + demType = cfg["TOPO"]["demType"] + + log.info("DEM type is set to: %s" % demType) + + # Set Output directory + outDir = pathlib.Path(avalancheDir, "Inputs") + if outDir.is_dir(): + log.info("The new DEM is saved to %s" % (outDir)) + else: + log.error( + "Required folder structure: NameOfAvalanche/Inputs missing! \ + Run runInitializeProject first!" + ) + + # Call topography type + if demType == "FP": + [x, y, z] = flatplane(cfg) + + elif demType == "IP": + [x, y, z] = inclinedplane(cfg) + + elif demType == "PF": + [x, y, z] = parabola(cfg) + + elif demType == "TPF": + [x, y, z] = parabolaRotation(cfg) + + elif demType == "HS": + [x, y, z] = hockey(cfg) + + elif demType == "BL": + [x, y, z] = bowl(cfg) + + elif demType == "HX": + [x, y, z] = helix(cfg) + + elif demType == "PY": + [x, y, z] = pyramid(cfg) + + elif demType == "DFTA": + [x, y, z] = debrisFlowTopoAverage(cfg) + + # If a drop shall be introduced + if cfg["TOPO"].getboolean("drop"): + z = addDrop(cfg, x, y, z) + + # moves topo in z direction + if cfg["DEMDATA"]["zEdit"] != "": + z = z + cfg["DEMDATA"].getfloat("zEdit") + log.info("Changed topo elevation by %.2f" % cfg["DEMDATA"].getfloat("zEdit")) + + # Write DEM to file + writeDEM(cfg, z, outDir) + + return (z, demType, outDir) diff --git a/debrisframe/in1Utils/generateTopoCfg.ini b/debrisframe/in1Utils/generateTopoCfg.ini new file mode 100644 index 0000000..deb9e47 --- /dev/null +++ b/debrisframe/in1Utils/generateTopoCfg.ini @@ -0,0 +1,139 @@ +### Config File - This file contains the main settings for the topography generation +## Set your parameters +# This file is part of DebrisFrame. + + +# General Topography parameters ---------------------- +[TOPO] +# DEM spatial resolution [m] +dx = 5. + +# total horizontal extent of the domain [m] +xEnd = 2000 + +# total horizontal extent of the domain [m] +yEnd = 1500 + +# topography type +# demType - topography type options: +# FP (Flat plane), IP (Inclined plane) [dx, xEnd, yEnd, zElev] +# PF (Parabolic slope with flat foreland) [dx, xEnd, yEnd, fLens or meanAlpha, C, optional:channel, dam] +# TPF (Triple parabolic slope with flat foreland) [dx, xEnd, yEnd, fLens, fFlat, C] +# HS (Hockeystick with linear slope and flat foreland and smooth transition) [dx, xEnd, yEnd, meanAlpha, z0, rCirc, optional:channel] +# BL (Bowl-shaped topography) [dx, xEnd, yEnd, rBowl] +# HX (Helix-shaped topography) [dx, xEnd, yEnd, flens or meanAlpha, C, rHelix, optional:channel] +# PY (pyramid-shaped topography, optional with flat foreland) [dx, xEnd, yEnd, meanAlpha, z0, optional:flatx, flaty, phi] +# DFTA (generic debris-flow topography) [dx, xEnd, yEnd, fLens, meanAlpha = 0, channel, narrowing] +demType = DFTA + +# distance to point where slope transitions into flat plane [m] - required for PF, HX, DFTA if meanAlpha is not provided +fLens = 1679 + +# legnth [m] of the flat foreground for the triple parabola for TPF +fFlat = 500 + +# slope angle from max. elevation to start flat plane [°] - or slope of inclined plane [°] +# this parameter required for IP, HS, PY, (PF, HX - if not fLens is used) +meanAlpha = 0 + +# total fall height [m] - required for PF, HX, DFTA +C = 709 + +# bowl radius [m] - required for BL +rBowl = 500 + +# radius for helix [m] - required for HX +rHelix = 1250 + +# max elevation of inclined plane [m] - required for IP, HS, PY +z0 = 2200 + +# elevation of flat plane [m] - required for FP +zElev = 0 + +# radius of smoothing circle [m] - required for HS +rCirc = 200 + +# flatland in front of pyramid - required for PY - optional +flatx = 2500 +flaty = 2500 + +# flags to set channel and narrowing of channel and if channel is cut into +# topo or a layer is added use topoAdd, True = additional topo is superimposed, False = additional topo is cut out +channel = True +narrowing = True +topoAdd = True +# dam option for PF topo +dam = False +# Add a step/drop in the topography (add a step in the x direction) +drop = False + +# flag if pyramid topography is rotated along z-axis +flagRot = True +# rotation angle along z-axis [degree] +phi = 25. + +# flag if not full bowl but only curved slope in x-direction +curvedSlope = False +#------------------------------------------------------ + + +# Channel parameters ----------------------------------- +[CHANNELS] +# standard channel radius - Gerinnebreite +cRadius = 20 + +# start and end half width of channel that is narrowing in the middle part +cInit = 250 + # mean mu - represents upper part of the channel (e.g. 10% of sloping topography part) +cMustart = 0.1 + +# mean mu - represents lower part of the channel (e.g. 62% of sloping topography part) +cMuend = 0.62 + +# mean mu - represents lower part of the channel (86% of sloping topography part) - flat plane +cMuendFP = 0.86 + +# standard deviation sigma +cff = 120 +#-------------------------------------------------------------- + +# DAM parameters ----------------------------------- +[DAMS] +# relative position with respect to where flat outrun starts +damPos = 0.6 + +damWidth = 75 + +damHeight = 75 + +# Add a drop in the topography (in x direction) +[DROP] +# angle of slope of the step in [°] +alphaDrop = 80 + +# x position of the start of the drop +xStartDrop = 1000 +# length of the drop in the x direction +dxDrop = 20 + +#DEM outputfile parameters--------------------------------- +[DEMDATA] +# x coordinate of lower left center +xl = 1000.0 + +# y-coordinate of lower left center +yl = -5000.0 + +# Prefix of DEM file name +demName = DEM + +# no data value +nodata_value = -9999 +#--------------------------------------------------------- + +# moves topography in z direction e.g. +100m or -100m +zEdit = + + + diff --git a/debrisframe/out1Plot/outTopo.py b/debrisframe/out1Plot/outTopo.py new file mode 100644 index 0000000..f7ab130 --- /dev/null +++ b/debrisframe/out1Plot/outTopo.py @@ -0,0 +1,148 @@ +""" + Simple plotting for DEMs +""" + +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import os +import numpy as np +import logging +from matplotlib.colors import LightSource +from matplotlib import cm + +from scipy.interpolate import griddata +# local imports +from avaframe.in1Data import getInput +from avaframe.in3Utils import geoTrans +import avaframe.out3Plot.plotUtils as pU + +# create local logger +log = logging.getLogger(__name__) + + +def _generateDEMPlot(X, Y, z, title): + """Generates 3d DEM plot, use this to style the plot""" + + fig = plt.figure(figsize=(10, 10)) + ax = plt.axes(projection='3d') + + ls = LightSource(270, 45) + # To use a custom hillshading mode, override the built-in shading and pass + # in the rgb colors of the shaded surface calculated from "shade". + rgb = ls.shade(z, cmap=cm.viridis, vert_exag=0.1, blend_mode='soft') + surf = ax.plot_surface(X, Y, z, rstride=1, cstride=1, facecolors=rgb, + linewidth=0, antialiased=False, shade=False) + + # These are other options to plot in 3d in case another look is needed + # ax.contour(X, Y, z, 20, linewidth=3, colors="g", linestyles="solid") + # ax.plot_wireframe(X, Y, z, rstride=100, cstride=100,lw=1) + # ax.plot_surface(X, Y, z, cmap=plt.cm.viridis, + # linewidth=0, antialiased=False) + + ax.set_title('DEM: %s' % title) + ax.set_xlabel('x [m]') + ax.set_ylabel('y [m]') + ax.set_zlabel('elevation (m)') + + return ax, fig + + +def plotDEM3D(cfg, showPlot = False): + """Plots the DEM from the avalancheDir in cfg alongside it + + Parameters + ---------- + cfg : configparser object + the main configuration + showPlot : boolean + If true shows the matplotlib plot + + """ + + # get avalanche dir + avalancheDir = cfg['MAIN']['avalancheDir'] + avaName = os.path.basename(avalancheDir) + log.info('Plotting DEM in : %s', avalancheDir) + + # read DEM + dem = getInput.readDEM(avalancheDir) + + # get DEM Path + demPath = getInput.getDEMPath(avalancheDir) + + header = dem['header'] + xl = header['xllcenter'] + yl = header['yllcenter'] + ncols = header['ncols'] + nrows = header['nrows'] + dx = header['cellsize'] + z = dem['rasterData'] + + # this line is needed for plot_surface to be able to handle the nans + z[np.isnan(z)] = np.nanmin(z) + + # Set coordinate grid with given origin + X, Y = geoTrans.makeCoordinateGrid(xl, yl, dx, ncols, nrows) + + ax, fig = _generateDEMPlot(X, Y, z, avaName) + + # Save figure to file + outName = demPath.stem + '_plot' + + # save and or show figure + plotPath = pU.saveAndOrPlot({'pathResult': avalancheDir}, outName, fig) + log.info('Saving plot to: %s', plotPath) + + +def plotGeneratedDEM(z, nameExt, cfg, outDir, cfgMain): + """ Plot DEM with given information on the origin of the DEM """ + + cfgTopo = cfg['TOPO'] + cfgDEM = cfg['DEMDATA'] + + # input parameters + dx = float(cfgTopo['dx']) + xl = float(cfgDEM['xl']) + yl = float(cfgDEM['yl']) + demName = cfgDEM['demName'] + + # Set coordinate grid with given origin + nrows, ncols = z.shape + X, Y = geoTrans.makeCoordinateGrid(xl, yl, dx, ncols, nrows) + + topoNames = {'IP': 'inclined Plane', 'FP': 'flat plane', 'PF': 'parabola flat', 'TPF': 'triple parabola flat', + 'HS': 'Hockeystick smoothed', 'BL': 'bowl', 'HX': 'Helix', 'PY': 'Pyramid', 'DFTA': 'generic debris-flow topography'} + + ax, fig = _generateDEMPlot(X, Y, z, topoNames[nameExt]) + + # Save figure to file + outName = '%s_%s_plot' % (demName, nameExt) + # save and or show figure + plotPath = pU.saveAndOrPlot({'pathResult': outDir}, outName, fig) + log.info('Saving plot to: %s', plotPath) + + +def plotReleasePoints(xv, yv, xyPoints, demType): + fig1, ax = plt.subplots(ncols=2, nrows=1, figsize=(pU.figW * 3, pU.figH * 2)) + ax[0].plot(xv, np.zeros(len(xv)) + yv[0], "k-") + ax[0].plot(xv, np.zeros(len(xv)) + yv[-1], "k-") + ax[0].plot(np.zeros(len(yv)) + xv[0], yv, "k-") + ax[0].plot(np.zeros(len(yv)) + xv[-1], yv, "k-") + ax[0].plot(xyPoints[:, 0], xyPoints[:, 1], "r*") + ax[0].set_title("Domain and release area of %s - projected" % demType) + ax[0].set_xlabel("along valley [m]") + ax[0].set_ylabel("across valley [m]") + + ax[1].plot(xyPoints[:, 0], xyPoints[:, 1], "r*") + ax[1].set_xlabel("along valley [m]") + ax[1].set_ylabel("across valley [m]") + + xLength = abs(xyPoints[0, 0] - xyPoints[3, 0]) + yLength = abs(xyPoints[0, 1] - xyPoints[1, 1]) + infoText = "xLength: %.2f \n yLength: %.2f \n" % (xLength, yLength) + props = dict(boxstyle="round", facecolor='white') + ax[1].text( + 0.5, 0.5, infoText, transform=ax[1].transAxes, fontsize=14, verticalalignment="top", bbox=props + ) + + plt.show() diff --git a/debrisframe/runScripts/runGenerateTopo.py b/debrisframe/runScripts/runGenerateTopo.py new file mode 100644 index 0000000..633fa66 --- /dev/null +++ b/debrisframe/runScripts/runGenerateTopo.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +""" + Run script for generateTopo in module in3Utils +""" +import logging +import pathlib + +# Local imports +from debrisframe.in1Utils import generateTopo as gT +from debrisframe.out1Plot import outTopo as oT +from avaframe.in3Utils import cfgUtils, logUtils + + +# log file name; leave empty to use default runLog.log +logName = 'generateTopo' + +# Load avalanche directory from general configuration file +cfgMain = cfgUtils.getGeneralConfig(nameFile=pathlib.Path("debrisframeCfg.ini")) +debrisDir = cfgMain["MAIN"]["avalancheDir"] +# avalancheDir = cfgMain['MAIN']['avalancheDir'] + +# Start logging +log = logUtils.initiateLogger(debrisDir, logName) +log.info('MAIN SCRIPT') + +# Load input parameters from configuration file +cfg = cfgUtils.getModuleConfig(gT, debrisDir) + +# Call main function to generate DEMs +[z, name_ext, outDir] = gT.generateTopo(cfg, debrisDir) + +# Plot new topogrpahy +oT.plotGeneratedDEM(z, name_ext, cfg, outDir, cfgMain) From 8904389499be7db6a51472092b46b5e30dcf53b2 Mon Sep 17 00:00:00 2001 From: JuLa96 <206707905+JuLa96@users.noreply.github.com> Date: Thu, 12 Feb 2026 12:13:57 +0100 Subject: [PATCH 2/5] Update in1Utils/generateTopo.py --- debrisframe/in1Utils/generateTopo.py | 681 +-------------------------- 1 file changed, 16 insertions(+), 665 deletions(-) diff --git a/debrisframe/in1Utils/generateTopo.py b/debrisframe/in1Utils/generateTopo.py index f90ebd1..a443a03 100644 --- a/debrisframe/in1Utils/generateTopo.py +++ b/debrisframe/in1Utils/generateTopo.py @@ -5,285 +5,15 @@ # load modules import logging import numpy as np -import math from scipy.stats import norm -from scipy.interpolate import griddata import pathlib -from rasterio.crs import CRS -from avaframe.in3Utils import geoTrans -import avaframe.in2Trans.rasterUtils as IOf +# local imports +import avaframe.in3Utils.generateTopo as genTop # create local logger # change log level in calling module to DEBUG to see log messages -log = logging.getLogger(__name__) - - -def getParabolaParams(cfg): - """Compute parameters for parabola""" - - # input parameters - C = float(cfg["TOPO"]["C"]) - fLens = float(cfg["TOPO"]["fLens"]) - meanAlpha = float(cfg["TOPO"]["meanAlpha"]) - - # If mean slope is given or distance to the start of the flat plane - if meanAlpha != 0: - fLen = C / np.tan(np.radians(meanAlpha)) - log.info("fLen computed from mean alpha: %.2f meters" % fLen) - else: - fLen = fLens - log.info("flen directly set to: %.2f meters" % fLen) - A = C / (fLen**2) - B = (-C * 2.0) / fLen - - return A, B, fLen - - -def getGridDefs(cfg): - # determine number of rows and columns to define domain - dx = float(cfg["TOPO"]["dx"]) - xEnd = float(cfg["TOPO"]["xEnd"]) - yEnd = float(cfg["TOPO"]["yEnd"]) - - return dx, xEnd, yEnd - - -def computeCoordGrid(dx, xEnd, yEnd): - - # Compute coordinate grid - xv = np.arange(0, xEnd + dx, dx) - yv = np.arange(-0.5 * yEnd, 0.5 * (yEnd + dx), dx) - nRows = len(yv) - nCols = len(xv) - x, y = np.meshgrid(xv, yv) - zv = np.zeros((nRows, nCols)) - - return xv, yv, zv, x, y, nRows, nCols - - -def flatplane(cfg): - """Compute coordinates of flat plane topography""" - - dx, xEnd, yEnd = getGridDefs(cfg) - - zElev = float(cfg["TOPO"]["zElev"]) - - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) - - # Set elevation of surface - zv = zv + zElev - - # Log info here - log.info("Flatplane coordinates computed") - - return x, y, zv - - -def inclinedplane(cfg): - """Compute coordinates of inclined plane with given slope (meanAlpha)""" - - # input parameters - dx, xEnd, yEnd = getGridDefs(cfg) - - z0 = float(cfg["TOPO"]["z0"]) - meanAlpha = float(cfg["TOPO"]["meanAlpha"]) - - cFf = float(cfg["CHANNELS"]["cff"]) - cRadius = float(cfg["CHANNELS"]["cRadius"]) - - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) - - # Set surface elevation from slope and max. elevation - zv = z0 - np.tan(np.radians(meanAlpha)) * x - - # If a channel shall be introduced - if cfg["TOPO"].getboolean("channel"): - # Compute cumulative distribution function and set horizontal extent of channel - c0 = norm.cdf(xv, 0, cFf) - cExtent = cRadius - yv = np.reshape(yv, (nRows, 1)) - - # if location within horizontal extent of channel, - # make half sphere shaped channel with radius given by channel horizontal extent - mask = np.zeros(np.shape(yv)) - mask[np.where(abs(yv) < cExtent)] = 1 - if cfg["TOPO"].getboolean("topoAdd"): - zv = zv + cExtent * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(yv) / (cExtent ** 2))))) * mask - mask = np.zeros(np.shape(yv)) - mask[np.where(abs(yv) >= cExtent)] = 1 - zv = zv + cExtent * c0 * mask - else: - zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(yv) / (cExtent ** 2)))) * mask - - # Log info here - log.info("Inclined plane coordinates computed") - - return x, y, zv - - -def addDrop(cfg, x, y, zv): - """Add a drop to a given topography - - The drop is added in the x drection - - Parameters - ---------- - cfg: configparser - configuration for generateTopo - x: 2D numpy array - x coordinate of the raster - y: 2D numpy array - y coordinate of the raster - zv: 2D numpy array - z coordinate of the raster - - Returns - ------- - zv: 2D numpy array - z coordinate of the raster taking the drop into account - """ - - # input parameters - dx, xEnd, yEnd = getGridDefs(cfg) - - xStartDrop = float(cfg["DROP"]["xStartDrop"]) - dxDrop = float(cfg["DROP"]["dxDrop"]) - alphaDrop = float(cfg["DROP"]["alphaDrop"]) - - # get zcoord - # deduce drop height from the drop steepness and length in x direction - dzDrop = dxDrop * np.tan(np.radians(alphaDrop)) - xEndDrop = xStartDrop + dxDrop - - nrows, ncols = np.shape(x) - # get the z coordinate of the point at the beginning of the drop - zIniDrop, _ = geoTrans.projectOnGrid( - xStartDrop * np.ones((nrows)), - y[:, 0], - np.vstack((zv[0, :], zv)), - csz=dx, - xllc=x[0, 0], - yllc=y[0, 0], - interp="bilinear", - ) - zIniDrop = np.tile(zIniDrop, (ncols, 1)).transpose() - # get the z coordinate of the point at the end of the drop - zEndDrop, _ = geoTrans.projectOnGrid( - xEndDrop * np.ones((nrows)), - y[:, 0], - np.vstack((zv[0, :], zv)), - csz=dx, - xllc=x[0, 0], - yllc=y[0, 0], - interp="bilinear", - ) - zEndDrop = np.tile(zEndDrop, (ncols, 1)).transpose() - # Set surface elevation from slope and max. elevation - zv = np.where( - ((x >= xStartDrop) & (x <= xEndDrop)), - zIniDrop - (x - xStartDrop) * np.tan(np.radians(alphaDrop)), - zv, - ) - zv = np.where(x > xEndDrop, zv - (dzDrop + zEndDrop - zIniDrop), zv) - - # Log info here - log.info("Added drop to the topography") - - return zv - - -def hockey(cfg): - """ - Compute coordinates of an inclined plane with a flat foreland defined by - total fall height z0, angle to flat foreland (meanAlpha) and a radius (rCirc) to - smooth the transition from inclined plane to flat foreland - """ - - # input parameters - rCirc = float(cfg["TOPO"]["rCirc"]) - meanAlpha = float(cfg["TOPO"]["meanAlpha"]) - z0 = float(cfg["TOPO"]["z0"]) - - cff = float(cfg["CHANNELS"]["cff"]) - cRadius = float(cfg["CHANNELS"]["cRadius"]) - cInit = float(cfg["CHANNELS"]["cInit"]) - cMustart = float(cfg["CHANNELS"]["cMustart"]) - cMuendFP = float(cfg["CHANNELS"]["cMuendFP"]) - - dx, xEnd, yEnd = getGridDefs(cfg) - - # Compute coordinate grid - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) - - # Compute distance to flat foreland for given meanAlpha - x1 = z0 / np.tan(np.radians(meanAlpha)) - if x1 >= xEnd * 0.9: - log.warning( - "Your domain (xEnd) is to small or the slope angle (meanAlpha) to" - "shallow to produce a signifcant (>10 percent of domain, in your case:" - " %.2f m) flat foreland!" % (0.1 * (xEnd - dx)) - ) - - # Compute circle parameters for smoothing the transition - beta = 0.5 * (180.0 - (meanAlpha)) - xc = rCirc / np.tan(np.radians(beta)) - yc = xc * np.cos(np.radians(meanAlpha)) - xCirc = x1 + xc - # for plotting - d1 = np.tan(np.radians(beta)) * x1 - - # Set surface elevation - zv = np.zeros((nRows, nCols)) - mask = np.zeros(np.shape(x)) - mask[np.where(x < (x1 - yc))] = 1 - zv = zv + (z0 - np.tan(np.radians(meanAlpha)) * x) * mask - - mask = np.zeros(np.shape(x)) - mask[np.where(((x1 - yc) <= x) & (x <= (x1 + xc)))] = 1 - # rCirc + np.sqrt(rCirc**2 - (xv[m] - xCirc)**2) - zv = zv + (rCirc - np.sqrt(np.abs(rCirc ** 2 - (xCirc - x) ** 2))) * mask - - # If a channel shall be introduced - if cfg["TOPO"].getboolean("channel"): - # Compute cumulative distribution function - c1 for upper part (start) - # of channel and c2 for lower part (end) of channel - c1 = norm.cdf(xv, cMustart * (x1), cff) - c2 = 1.0 - norm.cdf(xv, cMuendFP * (x1), cff) - - # combine both into one function separated at the the middle of - # the channel longprofile location - mask = np.zeros(np.shape(xv)) - mask[np.where(xv < (x1 * (0.5 * (cMustart + cMuendFP))))] = 1 - c0 = c1 * mask - - mask = np.zeros(np.shape(xv)) - mask[np.where(xv >= (x1 * (0.5 * (cMustart + cMuendFP))))] = 1 - c0 = c0 + c2 * mask - - # Is the channel of constant width or narrowing - if cfg["TOPO"].getboolean("narrowing"): - cExtent = cInit * (1 - c0[:]) + (c0[:] * cRadius) - else: - cExtent = np.zeros(np.shape(xv)) + cRadius - - # Set surface elevation - mask = np.zeros(np.shape(y)) - mask[np.where(abs(y) < cExtent)] = 1 - # Add surface elevation modification introduced by channel - if cfg["TOPO"].getboolean("topoAdd"): - zv = zv + cExtent * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2))))) * mask - # outside of the channel, add layer of channel thickness - mask = np.zeros(np.shape(y)) - mask[np.where(abs(y) >= cExtent)] = 1 - zv = zv + cExtent * c0 * mask - else: - zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2)))) * mask - - # Log info here - log.info("Hockeystick coordinates computed") - - return x, y, zv +log = logging.getLogger("avaframe.debrisframe.in1Utils") def debrisFlowTopoAverage(cfg): """ @@ -300,14 +30,14 @@ def debrisFlowTopoAverage(cfg): cMuend = float(cfg["CHANNELS"]["cMuend"]) # Get grid definitons - dx, xEnd, yEnd = getGridDefs(cfg) + dx, xEnd, yEnd = genTop.getGridDefs(cfg) # Compute coordinate grid - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) + xv, yv, zv, x, y, nRows, nCols = genTop.computeCoordGrid(dx, xEnd, yEnd) # If a channel shall be introduced # Get parabola Parameters - [A, B, fLen] = getParabolaParams(cfg) # fLen = x0 = 1679 m + [A, B, fLen] = genTop.getParabolaParams(cfg) # fLen = x0 = 1679 m # Set surface elevation mask = np.zeros(np.shape(xv)) @@ -367,385 +97,6 @@ def debrisFlowTopoAverage(cfg): return x, y, zv -def parabola(cfg): - """ - Compute coordinates of a parabolically-shaped slope with a flat foreland - defined by total fall height C, angle (meanAlpha) or distance (fLen) to flat foreland - """ - - C = float(cfg["TOPO"]["C"]) - cff = float(cfg["CHANNELS"]["cff"]) - cRadius = float(cfg["CHANNELS"]["cRadius"]) - cInit = float(cfg["CHANNELS"]["cInit"]) - cMustart = float(cfg["CHANNELS"]["cMustart"]) - cMuend = float(cfg["CHANNELS"]["cMuend"]) - - # Get grid definitons - dx, xEnd, yEnd = getGridDefs(cfg) - - # Compute coordinate grid - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) - - # Get parabola Parameters - [A, B, fLen] = getParabolaParams(cfg) - - # Set surface elevation - zv = np.ones((nRows, nCols)) - # initialize superimposed channel - superChannel = np.zeros(np.shape(xv)) - superDam = np.zeros(np.shape(xv)) - - zv = zv * ((-(B ** 2)) / (4.0 * A) + C) - mask = np.zeros(np.shape(xv)) - mask[np.where(xv < fLen)] = 1 - - zv = zv + (A * xv ** 2 + B * xv + C) * mask - - # If a channel shall be introduced - if cfg["TOPO"].getboolean("channel"): - c1 = norm.cdf(xv, cMustart * fLen, cff) - c2 = 1.0 - norm.cdf(xv, cMuend * fLen, cff) - - # combine both into one function separated at the the middle of - # the channel longprofile location - mask = np.zeros(np.shape(xv)) - mask[np.where(xv < (fLen * (0.5 * (cMustart + cMuend))))] = 1 - c0 = c1 * mask - - mask = np.zeros(np.shape(xv)) - mask[np.where(xv >= (fLen * (0.5 * (cMustart + cMuend))))] = 1 - c0 = c0 + c2 * mask - - # Is the channel of constant width or narrowing - if cfg["TOPO"].getboolean("narrowing"): - cExtent = cInit * (1 - c0[:]) + (c0[:] * cRadius) - else: - cExtent = np.zeros(nCols) + cRadius - - # Add surface elevation modification introduced by channel - mask = np.zeros(np.shape(y)) - mask[np.where(abs(y) < cExtent)] = 1 - if cfg["TOPO"].getboolean("topoAdd"): - superChannel = ( - superChannel - + cExtent * c0 * (1.0 - np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2))))) * mask - ) - # outside of the channel, add layer of channel thickness - mask = np.zeros(np.shape(y)) - mask[np.where(abs(y) >= cExtent)] = 1 - superChannel = superChannel + cExtent * c0 * mask - else: - superChannel = ( - superChannel - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(y) / (cExtent ** 2)))) * mask - ) - - if cfg["TOPO"].getboolean("dam"): - damPos = cfg["DAMS"].getfloat("damPos") - damHeight = cfg["DAMS"].getfloat("damHeight") - damWidth = cfg["DAMS"].getfloat("damWidth") - superDam = norm.pdf(xv, damPos * (-B / 2 / A), damWidth) - superDam = superDam / np.max(superDam) * damHeight - if not cfg["TOPO"].getboolean("topoAdd"): - superDam = superDam - cExtent * c0 - - # add channel and dam - zv = zv + np.maximum(superDam, superChannel) - - # Log info here - log.info("Parabola with flat outrun coordinates computed") - - return x, y, zv - - -def parabolaRotation(cfg): - """ - Compute coordinates of a parabolically-shaped slope with a flat foreland - defined by total fall height C, angle (meanAlpha) or distance (fLen) to flat foreland - One parabolic slope in x direction, one sloped with 45° and one with 60° - """ - - C = float(cfg["TOPO"]["C"]) - fFlat = float(cfg["TOPO"]["fFlat"]) - - # Get grid definitons - dx, xEnd, yEnd = getGridDefs(cfg) - - # Compute coordinate grid, with center in (0, 0) - xv = np.arange(-0.5 * xEnd, 0.5 * (xEnd + dx), dx) - yv = np.arange(-0.5 * yEnd, 0.5 * (yEnd + dx), dx) - nRows = len(yv) - nCols = len(xv) - xv, yv = np.meshgrid(xv, yv) - zv = np.zeros((nRows, nCols)) - - # Get parabola Parameters - [A, B, fLen] = getParabolaParams(cfg) - - # Set surface elevation - zv = np.ones((nRows, nCols)) - zv = zv * ((-(B ** 2)) / (4.0 * A) + C) - # compute polar coordinates - r = np.sqrt(xv**2 + yv**2) - theta = np.arctan2(-yv, xv) - - # add parabola aligned with x (going from left to right) - phi = math.pi - # rotation of the polar coord system to be aligned with the parabola direction - s = createParabolaAxis(phi, theta, r, zv, fLen, fFlat) - - mask = np.ones(np.shape(s)) - mask[np.where(theta < 2 * math.pi / 3)] = 0 - mask[np.where(theta <= -5 * math.pi / 8)] = 1 - mask[np.where(s > fLen)] = 0 - zv = zv + (A * s ** 2 + B * s + C) * mask - - # add parabola sloped 60° with x - phi = math.pi / 3 - # rotation of the polar coord system to be aligned with the parabola direction - s = createParabolaAxis(phi, theta, r, zv, fLen, fFlat) - - mask = np.ones(np.shape(s)) - mask[np.where(theta > 2 * math.pi / 3)] = 0 - mask[np.where(theta < math.pi / 24)] = 0 - mask[np.where(s > fLen)] = 0 - zv = zv + (A * s ** 2 + B * s + C) * mask - - # add parabola aligned with x (going from left to right) - phi = -math.pi / 4 - # rotation of the polar coord system to be aligned with the parabola direction - s = createParabolaAxis(phi, theta, r, zv, fLen, fFlat) - - # apply the parabola to the corresponding part of the dem - mask = np.ones(np.shape(s)) - mask[np.where(theta > math.pi / 24)] = 0 - mask[np.where(theta < -5 * math.pi / 8)] = 0 - mask[np.where(s > fLen)] = 0 - zv = zv + (A * s ** 2 + B * s + C) * mask - - # Log info here - log.info("Triple parabola with flat foreland coordinates computed") - - return xv, yv, zv - - -def createParabolaAxis(phi, theta, r, zv, fLen, fFlat): - """create the s coordinate for a lined sloped from theta - phi from x axis""" - # rotation of the polar coord system to be aligned with the parabola direction - gamma = theta - phi - gamma = np.where(gamma < 0, gamma + 2 * math.pi, gamma) - gamma = np.where(gamma >= 2 * math.pi, gamma - 2 * math.pi, gamma) - # compute the s in the cartesian coord system aligned with the parabola - s = r * np.cos(gamma) - # shift this so that origin is at the top of the parabola - s = -s + fLen + fFlat - # apply the parabola to the corresponding part of the dem - return s - - -def bowl(cfg): - """Compute coordinates of sphere with given radius (rBwol)""" - - # input parameters - rBwol = float(cfg["TOPO"]["rBowl"]) - - # Get grid definitions - dx, xEnd, yEnd = getGridDefs(cfg) - - # Compute coordinate grid - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) - - # recompute xv yv and x, y as they are shifted - xv = np.arange(-0.5 * xEnd, 0.5 * (xEnd + dx), dx) - yv = np.arange(-0.5 * yEnd, 0.5 * (yEnd + dx), dx) - x, y = np.meshgrid(xv, yv) - - # Set surface elevation - zv = rBwol * np.ones((nRows, nCols)) - if cfg["TOPO"].getboolean("curvedSlope"): - radius = np.sqrt(x**2) - else: - radius = np.sqrt(x**2 + y**2) - mask = np.zeros(np.shape(x)) - mask[np.where(radius <= rBwol)] = 1 - zv = zv - (rBwol * np.sqrt(np.abs(1 - (radius / rBwol) ** 2))) * mask - if cfg["TOPO"].getboolean("curvedSlope"): - zv[x >= 0] = 0.0 - - # Log info here - log.info("Bowl coordinates computed") - - return x, y, zv - - -def helix(cfg): - """Compute coordinates of helix-shaped topography with given radius (rHelix)""" - - # input parameters - rHelix = float(cfg["TOPO"]["rHelix"]) - C = float(cfg["TOPO"]["C"]) - cff = float(cfg["CHANNELS"]["cff"]) - cRadius = float(cfg["CHANNELS"]["cRadius"]) - cInit = float(cfg["CHANNELS"]["cInit"]) - cMustart = float(cfg["CHANNELS"]["cMustart"]) - cMuend = float(cfg["CHANNELS"]["cMuend"]) - - # Get grid definitions - dx, xEnd, yEnd = getGridDefs(cfg) - - # Compute coordinate grid - xv, yv, zv, x, y, nRows, nCols = computeCoordGrid(dx, xEnd, yEnd) - - # recompute xv yv and x, y as they are shifted - xv = np.arange(-0.5 * xEnd, 0.5 * (xEnd + dx), dx) - yv = np.arange(-yEnd, 0 + dx, dx) - x, y = np.meshgrid(xv, yv) - - # Get parabola Parameters - [A, B, fLen] = getParabolaParams(cfg) - - # Set surface elevation - zv = np.ones((nRows, nCols)) - radius = np.sqrt(x**2 + y**2) - theta = np.arctan2(y, x) + np.pi - zv = zv * ((-(B ** 2)) / (4.0 * A) + C) - mask = np.zeros(np.shape(x)) - mask[np.where((theta * rHelix) < fLen)] = 1 - - zv = zv + (A * (theta * rHelix) ** 2 + B * (theta * rHelix) + C) * mask - - # If channel is introduced to topography - if cfg["TOPO"].getboolean("channel"): - c0 = np.zeros(np.shape(x)) - mask = np.zeros(np.shape(x)) - mask[np.where((theta * rHelix) < (0.5 * (cMustart + cMuend) * fLen))] = 1 - c0 = c0 + norm.cdf(theta * rHelix, cMustart * fLen, cff) * mask - mask = np.zeros(np.shape(x)) - mask[np.where((theta * rHelix) >= (0.5 * (cMustart + cMuend) * fLen))] = 1 - c0 = c0 + (1.0 - norm.cdf(theta * rHelix, cMuend * fLen, cff)) * mask - # c0 = np.ones(np.shape(zv)) - # If channel of constant width or becoming narrower in the middle - if cfg["TOPO"].getboolean("narrowing"): - cExtent = cInit * (1.0 - c0) + c0 * cRadius - else: - cExtent = cRadius - - if cfg["TOPO"].getboolean("topoAdd"): - zv = zv + c0 * cExtent - - # Inner and outer boundaries of the channel - boundIn = rHelix - cExtent - boundExt = rHelix + cExtent - - # Set channel - mask = np.zeros(np.shape(x)) - mask[np.where((radius >= rHelix) & (radius < boundExt))] = 1 - radius1 = radius - rHelix - zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(radius1) / np.square(cExtent)))) * mask - - mask = np.zeros(np.shape(x)) - mask[np.where((radius < rHelix) & (radius > boundIn))] = 1 - radius2 = rHelix - radius - zv = zv - cExtent * c0 * np.sqrt(np.abs(1.0 - (np.square(radius1) / np.square(cExtent)))) * mask - - # set last row at Center to fall height - indCols = int(0.5 * nCols) - zv[-1, 0:indCols] = C - - # Log info here - log.info("Helix coordinates computed") - - return x, y, zv - - -def pyramid(cfg): - """Generate a pyramid topography - in this case rectangular domain""" - - # get parameters from ini - meanAlpha = float(cfg["TOPO"]["meanAlpha"]) - z0 = float(cfg["TOPO"]["z0"]) - flatx = float(cfg["TOPO"]["flatx"]) - flaty = float(cfg["TOPO"]["flaty"]) - phi = float(cfg["TOPO"]["phi"]) - dx = float(cfg["TOPO"]["dx"]) - - # initialise pyramid corners and center point - points = np.asarray([[-1.0, -1.0, 0], [-1.0, 1.0, 0], [1.0, 1.0, 0], [1.0, -1, 0.0], [0.0, 0.0, 1.0]]) - dxPoints = abs(points[4, 0] - points[1, 0]) - - # compute elevation of the apex point for given angle of pyramid facets - zAlpha = dxPoints * np.tan(np.deg2rad(meanAlpha)) - points[4, 2] = zAlpha - dcoors = points * z0 / zAlpha - - # if desired rotate pyramid - if cfg["TOPO"].getboolean("flagRot"): - dcoorsRot = np.zeros((len(dcoors), 3)) - for m in range(len(dcoorsRot)): - dcoorsRot[m, 0] = np.cos(np.deg2rad(phi)) * dcoors[m, 0] - np.sin(np.deg2rad(phi)) * dcoors[m, 1] - dcoorsRot[m, 1] = np.sin(np.deg2rad(phi)) * dcoors[m, 0] + np.cos(np.deg2rad(phi)) * dcoors[m, 1] - dcoorsRot[m, 2] = dcoors[m, 2] - dcoorsFin = dcoorsRot - else: - dcoorsFin = dcoors - - # split into horizontal and vertical coordinate points - xyPoints = np.zeros((len(points), 2)) - xyPoints[:, 0] = dcoorsFin[:, 0] - xyPoints[:, 1] = dcoorsFin[:, 1] - zPoints = dcoorsFin[:, 2] - - # make meshgrid for final DEM - xv = np.arange(-flatx + np.amin(dcoorsFin[:, 0]), np.amax(dcoorsFin[:, 0]) + flatx, dx) - yv = np.arange(-flaty + np.amin(dcoorsFin[:, 1]), np.amax(dcoorsFin[:, 1]) + flaty, dx) - x, y = np.meshgrid(xv, yv) - - # interpolate appex point information to meshgrid - z = griddata(xyPoints, zPoints, (x, y), method="linear") - zNan = np.isnan(z) - z[zNan] = 0.0 - - dX = np.amax(dcoorsFin[:, 0]) + flatx - (-flatx + np.amin(dcoorsFin[:, 0])) - dY = np.amax(dcoorsFin[:, 1]) + flaty - (-flaty + np.amin(dcoorsFin[:, 1])) - log.info("domain extent pyramid- inx: %f, in y: %f" % (dX, dY)) - - return x, y, z - - -def writeDEM(cfg, z, outDir): - """Write topography information to file""" - nameExt = cfg["TOPO"]["demType"] - nRows = z.shape[0] - nCols = z.shape[1] - - # Read lower left center coordinates, cellsize and noDATA value - xllcenter = float(cfg["DEMDATA"]["xl"]) - yllcenter = float(cfg["DEMDATA"]["yl"]) - cellsize = float(cfg["TOPO"]["dx"]) - noDATA = float(cfg["DEMDATA"]["nodata_value"]) - demName = cfg["DEMDATA"]["demName"] - - # Save elevation data to file and add header lines - demFile = outDir / ("%s_%s_Topo" % (demName, nameExt)) - demHeader = { - "ncols": nCols, - "nrows": nRows, - "xllcenter": xllcenter, - "yllcenter": yllcenter, - "cellsize": cellsize, - "nodata_value": noDATA, - } - - transform = IOf.transformFromASCHeader(demHeader) - demHeader["transform"] = transform - demHeader["driver"] = "AAIGrid" - demHeader["crs"] = CRS() - - IOf.writeResultToRaster(demHeader, z, demFile, flip=False) - - # Log info here - log.info("DEM written to: %s/%s_%s_Topo" % (outDir, demName, nameExt)) - - def generateTopo(cfg, avalancheDir): """Compute coordinates of desired topography with given inputs""" @@ -766,35 +117,35 @@ def generateTopo(cfg, avalancheDir): # Call topography type if demType == "FP": - [x, y, z] = flatplane(cfg) + [x, y, z] = genTop.flatplane(cfg) elif demType == "IP": - [x, y, z] = inclinedplane(cfg) + [x, y, z] = genTop.inclinedplane(cfg) elif demType == "PF": - [x, y, z] = parabola(cfg) + [x, y, z] = genTop.parabola(cfg) elif demType == "TPF": - [x, y, z] = parabolaRotation(cfg) + [x, y, z] = genTop.parabolaRotation(cfg) elif demType == "HS": - [x, y, z] = hockey(cfg) + [x, y, z] = genTop.hockey(cfg) elif demType == "BL": - [x, y, z] = bowl(cfg) + [x, y, z] = genTop.bowl(cfg) elif demType == "HX": - [x, y, z] = helix(cfg) + [x, y, z] = genTop.helix(cfg) elif demType == "PY": - [x, y, z] = pyramid(cfg) + [x, y, z] = genTop.pyramid(cfg) elif demType == "DFTA": [x, y, z] = debrisFlowTopoAverage(cfg) # If a drop shall be introduced if cfg["TOPO"].getboolean("drop"): - z = addDrop(cfg, x, y, z) + z = genTop.addDrop(cfg, x, y, z) # moves topo in z direction if cfg["DEMDATA"]["zEdit"] != "": @@ -802,6 +153,6 @@ def generateTopo(cfg, avalancheDir): log.info("Changed topo elevation by %.2f" % cfg["DEMDATA"].getfloat("zEdit")) # Write DEM to file - writeDEM(cfg, z, outDir) + genTop.writeDEM(cfg, z, outDir) return (z, demType, outDir) From 718f5cc58e9498ab1592c91fb429aa560b4970ab Mon Sep 17 00:00:00 2001 From: JuLa96 <206707905+JuLa96@users.noreply.github.com> Date: Thu, 12 Feb 2026 12:14:24 +0100 Subject: [PATCH 3/5] Update out1Plot/outTopo.py --- debrisframe/out1Plot/outTopo.py | 113 +------------------------------- 1 file changed, 3 insertions(+), 110 deletions(-) diff --git a/debrisframe/out1Plot/outTopo.py b/debrisframe/out1Plot/outTopo.py index f7ab130..dcbe69d 100644 --- a/debrisframe/out1Plot/outTopo.py +++ b/debrisframe/out1Plot/outTopo.py @@ -2,97 +2,15 @@ Simple plotting for DEMs """ -import matplotlib.pyplot as plt -from mpl_toolkits.mplot3d import Axes3D -import os -import numpy as np import logging -from matplotlib.colors import LightSource -from matplotlib import cm -from scipy.interpolate import griddata # local imports -from avaframe.in1Data import getInput from avaframe.in3Utils import geoTrans import avaframe.out3Plot.plotUtils as pU +import avaframe.out3Plot.outTopo as oT # create local logger -log = logging.getLogger(__name__) - - -def _generateDEMPlot(X, Y, z, title): - """Generates 3d DEM plot, use this to style the plot""" - - fig = plt.figure(figsize=(10, 10)) - ax = plt.axes(projection='3d') - - ls = LightSource(270, 45) - # To use a custom hillshading mode, override the built-in shading and pass - # in the rgb colors of the shaded surface calculated from "shade". - rgb = ls.shade(z, cmap=cm.viridis, vert_exag=0.1, blend_mode='soft') - surf = ax.plot_surface(X, Y, z, rstride=1, cstride=1, facecolors=rgb, - linewidth=0, antialiased=False, shade=False) - - # These are other options to plot in 3d in case another look is needed - # ax.contour(X, Y, z, 20, linewidth=3, colors="g", linestyles="solid") - # ax.plot_wireframe(X, Y, z, rstride=100, cstride=100,lw=1) - # ax.plot_surface(X, Y, z, cmap=plt.cm.viridis, - # linewidth=0, antialiased=False) - - ax.set_title('DEM: %s' % title) - ax.set_xlabel('x [m]') - ax.set_ylabel('y [m]') - ax.set_zlabel('elevation (m)') - - return ax, fig - - -def plotDEM3D(cfg, showPlot = False): - """Plots the DEM from the avalancheDir in cfg alongside it - - Parameters - ---------- - cfg : configparser object - the main configuration - showPlot : boolean - If true shows the matplotlib plot - - """ - - # get avalanche dir - avalancheDir = cfg['MAIN']['avalancheDir'] - avaName = os.path.basename(avalancheDir) - log.info('Plotting DEM in : %s', avalancheDir) - - # read DEM - dem = getInput.readDEM(avalancheDir) - - # get DEM Path - demPath = getInput.getDEMPath(avalancheDir) - - header = dem['header'] - xl = header['xllcenter'] - yl = header['yllcenter'] - ncols = header['ncols'] - nrows = header['nrows'] - dx = header['cellsize'] - z = dem['rasterData'] - - # this line is needed for plot_surface to be able to handle the nans - z[np.isnan(z)] = np.nanmin(z) - - # Set coordinate grid with given origin - X, Y = geoTrans.makeCoordinateGrid(xl, yl, dx, ncols, nrows) - - ax, fig = _generateDEMPlot(X, Y, z, avaName) - - # Save figure to file - outName = demPath.stem + '_plot' - - # save and or show figure - plotPath = pU.saveAndOrPlot({'pathResult': avalancheDir}, outName, fig) - log.info('Saving plot to: %s', plotPath) - +log = logging.getLogger("avaframe.debrisframe.out1Plot") def plotGeneratedDEM(z, nameExt, cfg, outDir, cfgMain): """ Plot DEM with given information on the origin of the DEM """ @@ -113,7 +31,7 @@ def plotGeneratedDEM(z, nameExt, cfg, outDir, cfgMain): topoNames = {'IP': 'inclined Plane', 'FP': 'flat plane', 'PF': 'parabola flat', 'TPF': 'triple parabola flat', 'HS': 'Hockeystick smoothed', 'BL': 'bowl', 'HX': 'Helix', 'PY': 'Pyramid', 'DFTA': 'generic debris-flow topography'} - ax, fig = _generateDEMPlot(X, Y, z, topoNames[nameExt]) + ax, fig = oT._generateDEMPlot(X, Y, z, topoNames[nameExt]) # Save figure to file outName = '%s_%s_plot' % (demName, nameExt) @@ -121,28 +39,3 @@ def plotGeneratedDEM(z, nameExt, cfg, outDir, cfgMain): plotPath = pU.saveAndOrPlot({'pathResult': outDir}, outName, fig) log.info('Saving plot to: %s', plotPath) - -def plotReleasePoints(xv, yv, xyPoints, demType): - fig1, ax = plt.subplots(ncols=2, nrows=1, figsize=(pU.figW * 3, pU.figH * 2)) - ax[0].plot(xv, np.zeros(len(xv)) + yv[0], "k-") - ax[0].plot(xv, np.zeros(len(xv)) + yv[-1], "k-") - ax[0].plot(np.zeros(len(yv)) + xv[0], yv, "k-") - ax[0].plot(np.zeros(len(yv)) + xv[-1], yv, "k-") - ax[0].plot(xyPoints[:, 0], xyPoints[:, 1], "r*") - ax[0].set_title("Domain and release area of %s - projected" % demType) - ax[0].set_xlabel("along valley [m]") - ax[0].set_ylabel("across valley [m]") - - ax[1].plot(xyPoints[:, 0], xyPoints[:, 1], "r*") - ax[1].set_xlabel("along valley [m]") - ax[1].set_ylabel("across valley [m]") - - xLength = abs(xyPoints[0, 0] - xyPoints[3, 0]) - yLength = abs(xyPoints[0, 1] - xyPoints[1, 1]) - infoText = "xLength: %.2f \n yLength: %.2f \n" % (xLength, yLength) - props = dict(boxstyle="round", facecolor='white') - ax[1].text( - 0.5, 0.5, infoText, transform=ax[1].transAxes, fontsize=14, verticalalignment="top", bbox=props - ) - - plt.show() From 09b85143b827a01903fb70e185df4865ed1f8cd9 Mon Sep 17 00:00:00 2001 From: JuLa96 <206707905+JuLa96@users.noreply.github.com> Date: Thu, 12 Feb 2026 12:14:57 +0100 Subject: [PATCH 4/5] Update runScripts/runGenerateTopo.py --- debrisframe/runScripts/runGenerateTopo.py | 32 +++++++++++------------ 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/debrisframe/runScripts/runGenerateTopo.py b/debrisframe/runScripts/runGenerateTopo.py index 633fa66..0a1cb48 100644 --- a/debrisframe/runScripts/runGenerateTopo.py +++ b/debrisframe/runScripts/runGenerateTopo.py @@ -2,7 +2,6 @@ """ Run script for generateTopo in module in3Utils """ -import logging import pathlib # Local imports @@ -10,24 +9,23 @@ from debrisframe.out1Plot import outTopo as oT from avaframe.in3Utils import cfgUtils, logUtils +if __name__ == '__main__': + # log file name; leave empty to use default runLog.log + logName = 'generateTopo' -# log file name; leave empty to use default runLog.log -logName = 'generateTopo' + # Load avalanche directory from general configuration file + cfgMain = cfgUtils.getGeneralConfig(nameFile=pathlib.Path("debrisframeCfg.ini")) + debrisDir = cfgMain["MAIN"]["avalancheDir"] -# Load avalanche directory from general configuration file -cfgMain = cfgUtils.getGeneralConfig(nameFile=pathlib.Path("debrisframeCfg.ini")) -debrisDir = cfgMain["MAIN"]["avalancheDir"] -# avalancheDir = cfgMain['MAIN']['avalancheDir'] + # Start logging + log = logUtils.initiateLogger(debrisDir, logName) + log.info('MAIN SCRIPT') -# Start logging -log = logUtils.initiateLogger(debrisDir, logName) -log.info('MAIN SCRIPT') + # Load input parameters from configuration file + cfg = cfgUtils.getModuleConfig(gT, debrisDir) -# Load input parameters from configuration file -cfg = cfgUtils.getModuleConfig(gT, debrisDir) + # Call main function to generate DEMs + [z, name_ext, outDir] = gT.generateTopo(cfg, debrisDir) -# Call main function to generate DEMs -[z, name_ext, outDir] = gT.generateTopo(cfg, debrisDir) - -# Plot new topogrpahy -oT.plotGeneratedDEM(z, name_ext, cfg, outDir, cfgMain) + # Plot new topogrpahy + oT.plotGeneratedDEM(z, name_ext, cfg, outDir, cfgMain) From 93b631f347da54a6b719e61edc0bc9d5e55f7eba Mon Sep 17 00:00:00 2001 From: JuLa96 <206707905+JuLa96@users.noreply.github.com> Date: Thu, 12 Feb 2026 13:49:09 +0100 Subject: [PATCH 5/5] Update in1Utils/generateTopo.py --- debrisframe/in1Utils/generateTopo.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/debrisframe/in1Utils/generateTopo.py b/debrisframe/in1Utils/generateTopo.py index a443a03..c2605ed 100644 --- a/debrisframe/in1Utils/generateTopo.py +++ b/debrisframe/in1Utils/generateTopo.py @@ -20,6 +20,11 @@ def debrisFlowTopoAverage(cfg): Compute coordinates of an average parabolic-shaped slope as a generic topography for debris-flow simulations defined by a 2nd-degree polynomial: ax**2 + bx + c The parameters of the polynomial function are derived from real watershed profiles (Kessler 2019) + + Parameters + ---------------------- + cfg: configparser Object + configuration setup for topo generation """ # input parameters C = float(cfg["TOPO"]["C"]) # C = 709 m @@ -98,7 +103,16 @@ def debrisFlowTopoAverage(cfg): return x, y, zv def generateTopo(cfg, avalancheDir): - """Compute coordinates of desired topography with given inputs""" + """ + Compute coordinates of desired topography with given inputs + + Parameters + ---------------------- + cfg: configparser Object + configuration setup for topo generation + debrisDir: string + directory to data folder + """ # Which DEM type demType = cfg["TOPO"]["demType"]