From 813306639053eeb7055b6e6467c048f49313d097 Mon Sep 17 00:00:00 2001 From: Meikel Bisping Date: Mon, 20 Jan 2025 11:35:09 +0100 Subject: [PATCH] Auch von Masken KettleJobs ausfuehren lassen (cherry picked from commit 460037bf0e86b5ff044d3fd29ad4a4f276abe8be) --- src/de/superx/bin/fm/EtlStarter.java | 134 ++++++++++++++++++--------- 1 file changed, 90 insertions(+), 44 deletions(-) diff --git a/src/de/superx/bin/fm/EtlStarter.java b/src/de/superx/bin/fm/EtlStarter.java index 2d32f61..f57222a 100644 --- a/src/de/superx/bin/fm/EtlStarter.java +++ b/src/de/superx/bin/fm/EtlStarter.java @@ -4,6 +4,10 @@ import java.io.File; import java.io.IOException; import java.io.StringWriter; import java.io.Writer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -40,9 +44,11 @@ public class EtlStarter implements TemplateTransformModel { private static Logger logger = Logger.getLogger(EtlStarter.class); private String mandantenId = "default"; private LogLevel logLevel = LogLevel.BASIC; - public EtlStarter(String mandantenId) { - this.mandantenId = mandantenId; - } + + public EtlStarter(String mandantenId) { + this.mandantenId = mandantenId; + } + @Override public Writer getWriter(Writer paramWriter, Map paramMap) throws TemplateModelException, IOException { logger.info("**EtlStarter**"); @@ -61,11 +67,11 @@ public class EtlStarter implements TemplateTransformModel { paramMap.remove(PARAM_KEY_COMP); paramMap.remove(PARAM_KEY_STEP); paramMap.remove(PARAM_LOGLEVEL); - + if (log_levelSc != null) { this.logLevel = LogLevel.valueOf(log_levelSc.getAsString()); } - + Map params = new HashMap(); for (Object key : paramMap.keySet()) { SimpleScalar value = (SimpleScalar) paramMap.get(key); @@ -73,7 +79,49 @@ public class EtlStarter implements TemplateTransformModel { logger.info("PARAM: " + key + " -> " + value.getAsString()); } - String moduleDir=SuperXManager.getModuleDir()+ File.separator + component + File.separator; + String moduleDir = SuperXManager.getModuleDir(); + if (moduleDir == null || moduleDir.equals("")) { + if (System.getProperty("MODULE_PFAD") != null && !System.getProperty("MODULE_PFAD").toString().equals("")) + moduleDir = System.getProperty("MODULE_PFAD").toString(); + else + throw new IOException( + "Module-Pfad kann nicht ermittelt werden, bitte setzen Sie den JVM Parameter -DMODULE_PFAD=..."); + } + String jobFilePath = null; + if (component.contentEquals("manual")) { + jobFilePath = getFilePathFromDatabase(etl_step, moduleDir); + } else { + jobFilePath = getFilePathFromModule(component, etl_step, moduleDir); + } + logger.info("Kettle job: " + jobFilePath); + kettleCallEmbedded(jobFilePath, params); + return null; + } + + private String getFilePathFromDatabase(String etl_step, String moduleDir) throws IOException { + String jobFilePath = null; + try (Connection con = SxPools.get(mandantenId).getConnection(); + PreparedStatement pst = con.prepareStatement("select filepath from sx_jobs where uniquename=?")) { + pst.setString(1, etl_step); + ResultSet rs = pst.executeQuery(); + while (rs.next()) { + jobFilePath = rs.getString(1); + } + rs.close(); + } catch (SQLException e) { + e.printStackTrace(); + throw new IOException("Fehler beim Auslesen von sx_jobs mit uniquename " + etl_step + " " + e); + } + if (jobFilePath == null) { + throw new IOException( + "Fehler beim Auslesen von sx_jobs mit uniquename " + etl_step + " Kein Eintrag gefunden"); + } + return moduleDir + File.separator + jobFilePath; + } + + private String getFilePathFromModule(String component, String etl_step, String moduleDir) + throws TemplateModelException { + moduleDir += File.separator + component + File.separator; if (!(new File(moduleDir)).exists()) { throw new TemplateModelException("Component not found: " + component); } @@ -96,44 +144,39 @@ public class EtlStarter implements TemplateTransformModel { } String fileAttr = etl.attributeValue("file"); String jobFilePath = moduleDir + fileAttr.substring(fileAttr.indexOf('/') + 1); - logger.info("Kettle job: " + jobFilePath); - kettleCallEmbedded(jobFilePath, params); - return null; + return jobFilePath; } /** * Abarbeiten des Kettle Jobs mit kettle (Embedded). * - * @param jobfile - * Dateiname des Jobs - * @param jobtype - * ktr=Transformation, kjb=Job - * @param params - * Parameter + * @param jobfile Dateiname des Jobs + * @param jobtype ktr=Transformation, kjb=Job + * @param params Parameter * @throws Exception */ final StringBuffer kettleCallEmbedded(final String jobfile, final Map jobParams) { - return kettleCallEmbedded("default",jobfile, jobParams, true); + return kettleCallEmbedded("default", jobfile, jobParams, true); } /** * Abarbeiten des Kettle Jobs mit kettle (Embedded). * - * @param jobfile - * Dateiname des Jobs - * @param params - * Parameter - * @param isPostgres - für Metainformationen, wenn false dann wird Informix genommen - * @return StringBuffer mit Logging für Ausgabe im Browser + * @param jobfile Dateiname des Jobs + * @param params Parameter + * @param isPostgres - für Metainformationen, wenn false dann wird Informix + * genommen + * @return StringBuffer mit Logging für Ausgabe im Browser * @throws Exception */ - public final StringBuffer kettleCallEmbedded(final String mandantenID,final String jobfile, final Map jobParams, boolean isPostgres) { + public final StringBuffer kettleCallEmbedded(final String mandantenID, final String jobfile, + final Map jobParams, boolean isPostgres) { StringWriter writer = new StringWriter(); - WriterAppender appender = new WriterAppender( new SimpleLayout(), writer ); - org.apache.log4j.Level oldLevel=logger.getLevel(); + WriterAppender appender = new WriterAppender(new SimpleLayout(), writer); + org.apache.log4j.Level oldLevel = logger.getLevel(); logger.setLevel(org.apache.log4j.Level.ALL); - logger.addAppender( appender ); - + logger.addAppender(appender); + try { // usually we initialize kettle env in EdustoreManager // but this may not have happened if HIS1 not running @@ -146,26 +189,29 @@ public class EtlStarter implements TemplateTransformModel { } JndiUtil.initJNDI(); MemoryMetaStore metastore = new MemoryMetaStore(); - //Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf SxPool , s.u. - //DataSourceProviderFactory.setDataSourceProviderInterface(new KettleDataSourceProvider(mandantenID,logger)); + // Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf + // SxPool , s.u. + // DataSourceProviderFactory.setDataSourceProviderInterface(new + // KettleDataSourceProvider(mandantenID,logger)); /* - * get all db connections configured in databases.xml as - * DataSource's by name + * get all db connections configured in databases.xml as DataSource's by name */ - - // Enumeration dbNames = + + // Enumeration dbNames = // this.dbhandlerPool.getLogicalDatabaseNames(); List dbNames = Arrays.asList(new String[] { "eduetl" }); for (String dbName : dbNames) { - // Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf SxPool , s.u. - - //DatabaseMeta dbmeta = new DatabaseMeta(dbName, isPostgres ? "POSTGRESQL" : "INFORMIX", "JNDI", null, dbName, "1521", null, null); - DatabaseMeta dbmeta=SxPools.get(mandantenID).getKettleDatabaseMeta(); - DatabaseMetaStoreUtil.createDatabaseElement(metastore, dbmeta); - - logger.info("Init pdi database connection " + dbName+" (DBName:"+dbmeta.getDatabaseName()+")"); + // Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf + // SxPool , s.u. + + // DatabaseMeta dbmeta = new DatabaseMeta(dbName, isPostgres ? "POSTGRESQL" : + // "INFORMIX", "JNDI", null, dbName, "1521", null, null); + DatabaseMeta dbmeta = SxPools.get(mandantenID).getKettleDatabaseMeta(); + DatabaseMetaStoreUtil.createDatabaseElement(metastore, dbmeta); + + logger.info("Init pdi database connection " + dbName + " (DBName:" + dbmeta.getDatabaseName() + ")"); } - + JobMeta jobMeta = new JobMeta(null, jobfile, null, metastore, null); org.pentaho.di.job.Job job = new org.pentaho.di.job.Job(null, jobMeta); job.setLogLevel(this.logLevel); @@ -204,16 +250,16 @@ public class EtlStarter implements TemplateTransformModel { logger.removeAppender(appender); logger.setLevel(oldLevel); } - + return writer.getBuffer(); } - + public static void main(String args[]) { String jobfile = "file:///home/superx/data-integration/exceltest.kjb"; Map jobParams = new HashMap(); jobParams.put("PATH_TO_EXCELFILE", "/home/superx/tmp/testexcel2.xlsx"); EtlStarter es = new EtlStarter("default"); - es.kettleCallEmbedded("default",jobfile, jobParams,false); + es.kettleCallEmbedded("default", jobfile, jobParams, false); } }