package de.superx.bin.fm; import static de.superx.servlet.SxSQL_Server.DEFAULT_MANDANTEN_ID; import java.io.File; import java.io.IOException; import java.io.StringWriter; import java.io.Writer; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.SimpleLayout; import org.apache.log4j.WriterAppender; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.pentaho.di.core.Result; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.logging.KettleLogStore; import org.pentaho.di.core.logging.KettleLoggingEvent; import org.pentaho.di.core.logging.LogLevel; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobEntryListener; import org.pentaho.di.job.JobMeta; import org.pentaho.di.job.entry.JobEntryCopy; import org.pentaho.di.job.entry.JobEntryInterface; import org.pentaho.di.metastore.DatabaseMetaStoreUtil; import org.pentaho.metastore.stores.memory.MemoryMetaStore; import de.superx.servlet.SuperXManager; import de.superx.servlet.SxPools; import de.superx.spring.batch.His1DataSources; import freemarker.template.SimpleScalar; import freemarker.template.TemplateModelException; import freemarker.template.TemplateTransformModel; public class EtlStarter implements TemplateTransformModel { public static final String PARAM_KEY_COMP = "component"; public static final String PARAM_KEY_STEP = "etl_step"; public static final String PARAM_LOGLEVEL = "log_level"; static Logger logger = Logger.getLogger(EtlStarter.class); private LogLevel logLevel = LogLevel.MINIMAL; private String mandantenId = DEFAULT_MANDANTEN_ID; public EtlStarter(String mandantenId) { this.mandantenId = mandantenId; } @Override public Writer getWriter(Writer paramWriter, Map paramMap) throws TemplateModelException, IOException { logger.info("**EtlStarter** for mandantenID " + this.mandantenId); SimpleScalar componentSc = ((SimpleScalar) paramMap.get(PARAM_KEY_COMP)); SimpleScalar etl_stepSc = (SimpleScalar) paramMap.get(PARAM_KEY_STEP); SimpleScalar log_levelSc = (SimpleScalar) paramMap.get(PARAM_LOGLEVEL); String component = componentSc.getAsString(); String etl_step = etl_stepSc.getAsString(); if (component == null || component.isEmpty()) { throw new TemplateModelException("Missing parameter " + PARAM_KEY_COMP); } if (etl_step == null || etl_step.isEmpty()) { throw new TemplateModelException("Missing parameter " + PARAM_KEY_STEP); } // check for additional params which will be passed to kettle job paramMap.remove(PARAM_KEY_COMP); paramMap.remove(PARAM_KEY_STEP); paramMap.remove(PARAM_LOGLEVEL); if (log_levelSc != null) { this.logLevel = LogLevel.valueOf(log_levelSc.getAsString()); } Map params = new HashMap(); for (Object key : paramMap.keySet()) { SimpleScalar value = (SimpleScalar) paramMap.get(key); params.put((String) key, value.getAsString()); logger.info("PARAM: " + key + " -> " + value.getAsString()); } String moduleDir = SuperXManager.getModuleDir(); if(moduleDir==null || moduleDir.equals("")) { if (System.getProperty("MODULE_PFAD") != null && !System.getProperty("MODULE_PFAD").toString().equals("")) moduleDir = System.getProperty("MODULE_PFAD").toString(); else throw new IOException("Module-Pfad kann nicht ermittelt werden, bitte setzen Sie den JVM Parameter -DMODULE_PFAD=..."); } String jobFilePath = null; if (component.contentEquals("manual")) { jobFilePath =getFilePathFromDatabase(etl_step,moduleDir); } else { jobFilePath = getFilePathFromModule(component, etl_step, moduleDir); } logger.info("Kettle job: " + jobFilePath); kettleCallEmbedded(jobFilePath, params, true); return null; } private String getFilePathFromDatabase(String etl_step,String moduleDir) throws IOException { String jobFilePath = null; try (Connection con = SxPools.get(mandantenId).getConnection(); PreparedStatement pst = con.prepareStatement("select filepath from sx_jobs where uniquename=?")) { pst.setString(1, etl_step); ResultSet rs=pst.executeQuery(); while (rs.next()) { jobFilePath=rs.getString(1); } rs.close(); } catch (SQLException e) { e.printStackTrace(); throw new IOException("Fehler beim Auslesen von sx_jobs mit uniquename "+etl_step+" "+e); } if (jobFilePath==null) { throw new IOException("Fehler beim Auslesen von sx_jobs mit uniquename "+etl_step+" Kein Eintrag gefunden"); } return moduleDir+File.separator+jobFilePath; } private String getFilePathFromModule(String component, String etl_step, String moduleDir) throws TemplateModelException { moduleDir += File.separator + component + File.separator; if (!(new File(moduleDir)).exists()) { throw new TemplateModelException("Component not found: " + component); } File componentXml = new File(moduleDir + File.separator + "conf" + File.separator + component + ".xml"); if (!componentXml.canRead()) { throw new TemplateModelException("Cannot read component xml for " + component); } SAXReader reader = new SAXReader(); Element etl; try { Document document = reader.read(componentXml); etl = (Element) document.selectSingleNode("//module/etl/etl-step[@id='" + etl_step + "']/action/kettle-job-embedded"); } catch (DocumentException e) { throw new TemplateModelException(e); } if (etl == null) { throw new TemplateModelException("Didn't find kettle-job-embedded in etl-step with id " + etl_step + " for component " + component); } String fileAttr = etl.attributeValue("file"); String jobFilePath = moduleDir + fileAttr.substring(fileAttr.indexOf('/') + 1); return jobFilePath; } /** * Abarbeiten des Kettle Jobs mit kettle (Embedded). * * @param jobfile * Dateiname des Jobs * @param jobtype * ktr=Transformation, kjb=Job * @param params * Parameter * @throws Exception */ public final StringBuffer kettleCallEmbedded(final String jobfile, final Map jobParams, boolean isPostgres) { return kettleCallEmbedded(this.mandantenId, jobfile, jobParams, isPostgres); } /** * Abarbeiten des Kettle Jobs mit kettle (Embedded). * * @param jobfile * Dateiname des Jobs * @param params * Parameter * @param isPostgres - für Metainformationen, wenn false dann wird Informix genommen * @return StringBuffer mit Logging für Ausgabe im Browser * @throws Exception */ private final StringBuffer kettleCallEmbedded(final String mandantenID, final String jobfile, final Map jobParams, boolean isPostgres) { StringWriter writer = new StringWriter(); WriterAppender appender = new WriterAppender(new SimpleLayout(), writer); Level oldLevel = logger.getLevel(); logger.setLevel(Level.DEBUG); logger.addAppender(appender); try { MemoryMetaStore metastore = new MemoryMetaStore(); /* * get all db connections configured in databases.xml as * DataSource's by name */ His1DataSources dataSources = SuperXManager.getBean("dataSources", His1DataSources.class); // if no dataSources defined, at least try eduetl Set dbNames = dataSources != null ? dataSources.getKeys() : Set.of("eduetl"); for (String dbName : dbNames) { DatabaseMeta dbmeta = new DatabaseMeta(dbName, isPostgres ? "POSTGRESQL" : "INFORMIX", "JNDI", null, dbName, "1521", null, null); DatabaseMetaStoreUtil.createDatabaseElement(metastore, dbmeta); logger.info("Init pdi database connection " + dbName); } JobMeta jobMeta = new JobMeta(null, jobfile, null, metastore, null); jobMeta.setSafeModeEnabled(true); org.pentaho.di.job.Job job = new org.pentaho.di.job.Job(null, jobMeta); job.setLogLevel(this.logLevel); String[] params = jobMeta.listParameters(); for (String param : params) { logger.info("Job-Param: " + param); logger.info(" -> defaults to " + jobMeta.getParameterDefault(param)); } job.copyParametersFrom(jobMeta); // to be able to track progress job.setInteractive(true); if (jobParams != null) { for (String param : jobParams.keySet()) { String value = jobParams.get(param); if (param.equals(PARAM_LOGLEVEL)) { job.setLogLevel(LogLevel.valueOf(value)); logger.info("Set kettle LogLevel to " + value); continue; } logger.info("PARAM " + param + " = " + value); jobMeta.setParameterValue(param, value); } } KettleLogStore.discardLines(job.getLogChannelId(), true); int logStartLine = KettleLogStore.getLastBufferLineNr(); job.addJobEntryListener(new JobEntryListener() { private Map jobEntryCounts = new HashMap<>(); @Override public void beforeExecution(Job ajob, JobEntryCopy jobEntryCopy, JobEntryInterface jobEntryInterface) { String name = ajob.getJobname() + "_" + jobEntryCopy.getName(); int count = jobEntryCounts.containsKey(name) ? jobEntryCounts.get(name).intValue() + 1 : 1; jobEntryCounts.put(name, Integer.valueOf(count)); if (jobEntryCounts.get(name).intValue() > 10) { throw new RuntimeException("Job creating too many job entries of " + name); } logger.debug("JobEntry: " + name + " Count: " + jobEntryCounts.get(name)); } @Override public void afterExecution(Job job, JobEntryCopy jobEntryCopy, JobEntryInterface jobEntryInterface, Result result) { // TODO Auto-generated method stub } }); job.activateParameters(); job.start(); job.waitUntilFinished(); List logEvents = KettleLogStore.getLogBufferFromTo(job.getLogChannelId(), true, logStartLine, KettleLogStore.getLastBufferLineNr()); StringBuffer errors = new StringBuffer(); for ( KettleLoggingEvent event : logEvents ) { logger.debug(event.getMessage().toString()); if ( event.getLevel() == LogLevel.ERROR ) { errors.append( event.getMessage() ); } } if (errors.length() > 0) { logger.info(job.getErrors() + " Errors executing Kettle job " + jobfile); throw new RuntimeException("Errors in Kettle job " + jobfile + ": " + errors ); } logger.info("Job erfolgreich durchgeführt"); } catch (Exception ex) { logger.error("Error executing Kettle job " + jobfile, ex); throw new RuntimeException(ex); } finally { logger.removeAppender(appender); logger.setLevel(oldLevel); } return writer.getBuffer(); } public static void main(String args[]) { String jobfile = "file:///home/superx/data-integration/exceltest.kjb"; Map jobParams = new HashMap(); jobParams.put("PATH_TO_EXCELFILE", "/home/superx/tmp/testexcel2.xlsx"); EtlStarter es = new EtlStarter("default"); es.kettleCallEmbedded(jobfile, jobParams, false); } }