Browse Source

Auch von Masken KettleJobs ausfuehren lassen

(cherry picked from commit 460037bf0e86b5ff044d3fd29ad4a4f276abe8be)
master
Meikel Bisping 2 weeks ago
parent
commit
8133066390
  1. 116
      src/de/superx/bin/fm/EtlStarter.java

116
src/de/superx/bin/fm/EtlStarter.java

@ -4,6 +4,10 @@ import java.io.File; @@ -4,6 +4,10 @@ import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -40,9 +44,11 @@ public class EtlStarter implements TemplateTransformModel { @@ -40,9 +44,11 @@ public class EtlStarter implements TemplateTransformModel {
private static Logger logger = Logger.getLogger(EtlStarter.class);
private String mandantenId = "default";
private LogLevel logLevel = LogLevel.BASIC;
public EtlStarter(String mandantenId) {
this.mandantenId = mandantenId;
}
public EtlStarter(String mandantenId) {
this.mandantenId = mandantenId;
}
@Override
public Writer getWriter(Writer paramWriter, Map paramMap) throws TemplateModelException, IOException {
logger.info("**EtlStarter**");
@ -73,7 +79,49 @@ public class EtlStarter implements TemplateTransformModel { @@ -73,7 +79,49 @@ public class EtlStarter implements TemplateTransformModel {
logger.info("PARAM: " + key + " -> " + value.getAsString());
}
String moduleDir=SuperXManager.getModuleDir()+ File.separator + component + File.separator;
String moduleDir = SuperXManager.getModuleDir();
if (moduleDir == null || moduleDir.equals("")) {
if (System.getProperty("MODULE_PFAD") != null && !System.getProperty("MODULE_PFAD").toString().equals(""))
moduleDir = System.getProperty("MODULE_PFAD").toString();
else
throw new IOException(
"Module-Pfad kann nicht ermittelt werden, bitte setzen Sie den JVM Parameter -DMODULE_PFAD=...");
}
String jobFilePath = null;
if (component.contentEquals("manual")) {
jobFilePath = getFilePathFromDatabase(etl_step, moduleDir);
} else {
jobFilePath = getFilePathFromModule(component, etl_step, moduleDir);
}
logger.info("Kettle job: " + jobFilePath);
kettleCallEmbedded(jobFilePath, params);
return null;
}
private String getFilePathFromDatabase(String etl_step, String moduleDir) throws IOException {
String jobFilePath = null;
try (Connection con = SxPools.get(mandantenId).getConnection();
PreparedStatement pst = con.prepareStatement("select filepath from sx_jobs where uniquename=?")) {
pst.setString(1, etl_step);
ResultSet rs = pst.executeQuery();
while (rs.next()) {
jobFilePath = rs.getString(1);
}
rs.close();
} catch (SQLException e) {
e.printStackTrace();
throw new IOException("Fehler beim Auslesen von sx_jobs mit uniquename " + etl_step + " " + e);
}
if (jobFilePath == null) {
throw new IOException(
"Fehler beim Auslesen von sx_jobs mit uniquename " + etl_step + " Kein Eintrag gefunden");
}
return moduleDir + File.separator + jobFilePath;
}
private String getFilePathFromModule(String component, String etl_step, String moduleDir)
throws TemplateModelException {
moduleDir += File.separator + component + File.separator;
if (!(new File(moduleDir)).exists()) {
throw new TemplateModelException("Component not found: " + component);
}
@ -96,43 +144,38 @@ public class EtlStarter implements TemplateTransformModel { @@ -96,43 +144,38 @@ public class EtlStarter implements TemplateTransformModel {
}
String fileAttr = etl.attributeValue("file");
String jobFilePath = moduleDir + fileAttr.substring(fileAttr.indexOf('/') + 1);
logger.info("Kettle job: " + jobFilePath);
kettleCallEmbedded(jobFilePath, params);
return null;
return jobFilePath;
}
/**
* Abarbeiten des Kettle Jobs mit kettle (Embedded).
*
* @param jobfile
* Dateiname des Jobs
* @param jobtype
* ktr=Transformation, kjb=Job
* @param params
* Parameter
* @param jobfile Dateiname des Jobs
* @param jobtype ktr=Transformation, kjb=Job
* @param params Parameter
* @throws Exception
*/
final StringBuffer kettleCallEmbedded(final String jobfile, final Map<String, String> jobParams) {
return kettleCallEmbedded("default",jobfile, jobParams, true);
return kettleCallEmbedded("default", jobfile, jobParams, true);
}
/**
* Abarbeiten des Kettle Jobs mit kettle (Embedded).
*
* @param jobfile
* Dateiname des Jobs
* @param params
* Parameter
* @param isPostgres - für Metainformationen, wenn false dann wird Informix genommen
* @return StringBuffer mit Logging für Ausgabe im Browser
* @param jobfile Dateiname des Jobs
* @param params Parameter
* @param isPostgres - für Metainformationen, wenn false dann wird Informix
* genommen
* @return StringBuffer mit Logging für Ausgabe im Browser
* @throws Exception
*/
public final StringBuffer kettleCallEmbedded(final String mandantenID,final String jobfile, final Map<String, String> jobParams, boolean isPostgres) {
public final StringBuffer kettleCallEmbedded(final String mandantenID, final String jobfile,
final Map<String, String> jobParams, boolean isPostgres) {
StringWriter writer = new StringWriter();
WriterAppender appender = new WriterAppender( new SimpleLayout(), writer );
org.apache.log4j.Level oldLevel=logger.getLevel();
WriterAppender appender = new WriterAppender(new SimpleLayout(), writer);
org.apache.log4j.Level oldLevel = logger.getLevel();
logger.setLevel(org.apache.log4j.Level.ALL);
logger.addAppender( appender );
logger.addAppender(appender);
try {
// usually we initialize kettle env in EdustoreManager
@ -146,24 +189,27 @@ public class EtlStarter implements TemplateTransformModel { @@ -146,24 +189,27 @@ public class EtlStarter implements TemplateTransformModel {
}
JndiUtil.initJNDI();
MemoryMetaStore metastore = new MemoryMetaStore();
//Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf SxPool , s.u.
//DataSourceProviderFactory.setDataSourceProviderInterface(new KettleDataSourceProvider(mandantenID,logger));
// Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf
// SxPool , s.u.
// DataSourceProviderFactory.setDataSourceProviderInterface(new
// KettleDataSourceProvider(mandantenID,logger));
/*
* get all db connections configured in databases.xml as
* DataSource's by name
* get all db connections configured in databases.xml as DataSource's by name
*/
// Enumeration<String> dbNames =
// Enumeration<String> dbNames =
// this.dbhandlerPool.getLogicalDatabaseNames();
List<String> dbNames = Arrays.asList(new String[] { "eduetl" });
for (String dbName : dbNames) {
// Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf SxPool , s.u.
// Dies hatte für Mandantenbetrieb nicht funktioniert, daher Umstellung auf
// SxPool , s.u.
//DatabaseMeta dbmeta = new DatabaseMeta(dbName, isPostgres ? "POSTGRESQL" : "INFORMIX", "JNDI", null, dbName, "1521", null, null);
DatabaseMeta dbmeta=SxPools.get(mandantenID).getKettleDatabaseMeta();
DatabaseMetaStoreUtil.createDatabaseElement(metastore, dbmeta);
// DatabaseMeta dbmeta = new DatabaseMeta(dbName, isPostgres ? "POSTGRESQL" :
// "INFORMIX", "JNDI", null, dbName, "1521", null, null);
DatabaseMeta dbmeta = SxPools.get(mandantenID).getKettleDatabaseMeta();
DatabaseMetaStoreUtil.createDatabaseElement(metastore, dbmeta);
logger.info("Init pdi database connection " + dbName+" (DBName:"+dbmeta.getDatabaseName()+")");
logger.info("Init pdi database connection " + dbName + " (DBName:" + dbmeta.getDatabaseName() + ")");
}
JobMeta jobMeta = new JobMeta(null, jobfile, null, metastore, null);
@ -213,7 +259,7 @@ public class EtlStarter implements TemplateTransformModel { @@ -213,7 +259,7 @@ public class EtlStarter implements TemplateTransformModel {
Map<String, String> jobParams = new HashMap<String, String>();
jobParams.put("PATH_TO_EXCELFILE", "/home/superx/tmp/testexcel2.xlsx");
EtlStarter es = new EtlStarter("default");
es.kettleCallEmbedded("default",jobfile, jobParams,false);
es.kettleCallEmbedded("default", jobfile, jobParams, false);
}
}

Loading…
Cancel
Save