package de.superx.bin; import java.io.File; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.StringTokenizer; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import de.memtext.util.GetOpts; import de.memtext.util.StringUtils; import de.superx.bin.fm.EtlStarter; import de.superx.common.DBServletException; import de.superx.common.SuperX_el; import de.superx.common.SxResultRow; import de.superx.common.SxResultSet; import de.superx.servlet.ServletUtils; import de.superx.servlet.SuperXManager; import de.superx.servlet.SxPools; public class KettleExecutor { private String mandantenID; private SxJob sxJob; private boolean isPostgres = true; private StringBuffer resultMessages = new StringBuffer(""); private Map jobParams = new HashMap(); public KettleExecutor(String mandantenID) { this.mandantenID = mandantenID; isPostgres = SxPools.get(mandantenID).getDatabaseAbbr().equals(("PG")); } public void setParams(Map jobParams) { this.jobParams = jobParams; } public StringBuffer perform() throws IOException, SQLException, DBServletException { try { initSxJob(); EtlStarter es = new EtlStarter(this.mandantenID); if (!isModusVorschau()) { check_sql("vor"); } resultMessages.append(es.kettleCallEmbedded(mandantenID, sxJob.getFilepath(), jobParams, isPostgres)); if (isModusVorschau()) { //Vor Ergebnis Vorschau einfügen resultMessages.insert(0, getPreviewInfo()); } else { check_sql("nach"); } } catch (Exception e) { throw new IOException(e); } return resultMessages; } private boolean isModusVorschau() { return jobParams.containsKey("Modus") && jobParams.get("Modus") != null && jobParams.get("Modus").equals("3"); } private StringBuilder getPreviewInfo() throws IOException { StringBuilder result = new StringBuilder(); File loadjoblog = new File(new File(sxJob.getFilepath().replaceAll("file:", "")).getParentFile().getPath() + File.separator + "ladejob.log"); if (loadjoblog.exists()) { String rawpreview = StringUtils.readFile(loadjoblog); StringTokenizer st = new StringTokenizer(rawpreview, "\n"); result.append("Vorschau\n"); String currentHeader = ""; while (st.hasMoreTokens()) { String line = st.nextToken(); if (line.indexOf("|") > -1) { String firstpart = line.substring(0, line.indexOf("|")); if (!firstpart.equals(currentHeader)) { result.append(firstpart + "\n"); currentHeader = firstpart; result.append("-->" + line.substring(line.indexOf("|") + 1) + "\n"); } else { result.append("-->" + StringUtils.replace(line, firstpart + "|", "") + "\n"); } } } } return result; } private void check_sql(String pos) throws SQLException, DBServletException { if (sxJob.check_sql != null) { SuperX_el el = ServletUtils.execute_el("check_sql " + pos + " Durchführung von Ladejob " + sxJob.getCaption(), sxJob.check_sql, false, mandantenID); SxResultSet rs = el.getResultSet(); for (Iterator it = rs.iterator(); it.hasNext();) { SxResultRow row = (SxResultRow) it.next(); Object erg = row.get(0); if (erg != null) { resultMessages.append("Prüfselektion " + pos + " Durchführung von Ladejob " + sxJob.getCaption()); resultMessages.append("\n" + erg.toString() + "\n"); } } } } private void initSxJob() throws SQLException, DBServletException { SxResultSet rs = ServletUtils.execute("Einlesen von SxJob)", "select caption,filepath,params,check_sql from sx_jobs where tid=" + jobParams.get("Job"), mandantenID); sxJob = new SxJob(); for (Iterator it = rs.iterator(); it.hasNext();) { SxResultRow row = (SxResultRow) it.next(); sxJob.setCaption(row.get(0).toString()); sxJob.setFilepath("file://" + SuperXManager.getModuleDir() + File.separator + row.get(1).toString()); sxJob.setParams((String) row.get(2)); sxJob.setCheck_sql((String) row.get(3)); } } public static void main(String args[]) { String usage = "usage: -mandantenID:default -MODULE_PFAD:/home/superx/db/module -WEB_INF_PFAD:/home/superx/webserver/tomcat/webapps/superx/WEB-INF -job_uniquename:abc -path_to_uploadfile:/home/superx (optional)"; GetOpts.setOpts(args); String isdrin = GetOpts.isAllRequiredOptionsPresent("-mandantenID,-WEB_INF_PFAD,-MODULE_PFAD,-job_uniquename"); if (isdrin != null) { System.err.println(usage); System.exit(1); } String mandantenID = GetOpts.getValue("-mandantenID"); String job_uniquename = GetOpts.getValue("-job_uniquename"); String webinfpfad=GetOpts.getValue("-WEB_INF_PFAD"); if(webinfpfad != null && !webinfpfad.equals("")) SuperXManager.setWEB_INFPfad(GetOpts.getValue("-WEB_INF_PFAD")); SuperXManager.setModuleDir(GetOpts.getValue("-MODULE_PFAD")); String path_to_uploadfile = ""; if (GetOpts.isPresent("-path_to_uploadfile")) { path_to_uploadfile = GetOpts.getValue("-path_to_uploadfile"); if (!new File(path_to_uploadfile).exists()) { System.out.println("Fehler: Datei " + path_to_uploadfile + " nicht gefunden"); System.exit(-1); } } try { //muss vor SxPools init ausgeführt werden, sonst kann kein kettleDatabaseMeta-Objekt erzeugt werden java.lang.RuntimeException: Database type not found! KettleEnvironment.init(); SxPools.init(); SxPools.get(mandantenID).init(); //SxPools.get(mandantenID).initLogging(true); SxPools.resetAllPools(); } catch (Exception e) { System.out.println("Fehler beim Datenbankverbindungsaufbau " + e); e.printStackTrace(); ; System.exit(-1); } try { String jobtid = getJobTid(job_uniquename, mandantenID); KettleExecutor ke = new KettleExecutor(mandantenID); Map jobParams = new HashMap(); jobParams.put("Job", jobtid); jobParams.put("PATH_TO_UPLOADFILE", path_to_uploadfile); ke.setParams(jobParams); ke.perform(); System.out.println("Keine Fehler aufgefallen"); } catch (Exception e) { System.out.println("Fehler bei Verarbeitung " + e); e.printStackTrace(); System.exit(-1); } } private static String getJobTid(String job_uniquename, String mandantenID) throws SQLException, DBServletException { String tid = ""; String sql = "select tid from sx_jobs where uniquename='" + job_uniquename + "'"; SxResultSet rs = ServletUtils.execute("Suche tid für sx_job mit uniquename " + job_uniquename, sql, mandantenID); if (rs.size() == 0) throw new RuntimeException("Kein sx_job mit uniquename " + job_uniquename + " gefunden"); if (rs.size() > 1) throw new RuntimeException("Mehrere sx_jobs mit uniquename " + job_uniquename + " gefunden"); for (Iterator it = rs.iterator(); it.hasNext();) { SxResultRow row = (SxResultRow) it.next(); Object erg = row.get(0); if (erg != null) tid = erg.toString(); } return tid; } private class SxJob { private String filepath; private String params; private String check_sql; private String caption; public String getCaption() { return caption; } public void setCaption(String caption) { this.caption = caption; } public String getFilepath() { return filepath; } public void setFilepath(String filepath) { this.filepath = filepath; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getCheck_sql() { return check_sql; } public void setCheck_sql(String check_sql) { this.check_sql = check_sql; } } }