/* * de.superx.etl - a package for controlling ETL routines * Copyright (C) 2021 Daniel Quathamer * * This package is licensed under the CampusSource License; * http://www.campussource.de/org/license/ */ package de.superx.elt; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintStream; import java.io.Reader; import java.io.UnsupportedEncodingException; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.text.ParseException; import java.util.Iterator; import java.util.Properties; import de.superx.util.SqlStringUtils; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import javax.xml.xpath.XPathFactory; import javax.xml.xpath.XPath; import javax.xml.xpath.XPathConstants; import org.w3c.dom.Document; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.*; import org.postgresql.PGConnection; import org.postgresql.copy.CopyManager; import de.superx.bin.SxConnection; import de.superx.bin.SxJdbcClient; import de.superx.bin.UploadRecords; import de.superx.elt.util.FileUtils; import de.memtext.util.DateUtils; import de.memtext.util.GetOpts; import de.memtext.util.StringUtils; import de.memtext.util.XMLUtils; public class TableUploader { private String logfile; private String dbpropfile; private String mode="stop"; private String inFormat; private String targetTable; private String srcFile; private boolean header=false; private String delim="^"; private String encoding=SqlStringUtils.getEncoding(); private String inserts=""; private boolean truncateTargetTable=true; private boolean continueAfterError; private boolean removeTrailingDelim=true; private boolean isPostgres; private boolean useBatch=true; private static int maxCols=1000; private String[] insert_cols = new String[maxCols]; private int[] insert_types = new int[maxCols]; private int numberOfColumns; public long numberOfRows; private int returnCode; private String xml_search_path; private Connection uploadConnection; private DatabaseMetaData dbmd; private PreparedStatement pst; public TableUploader() { // TODO Auto-generated constructor stub } public Connection getUploadConnection() { return uploadConnection; } public void setUploadConnection(Connection uploadConnection) { this.uploadConnection = uploadConnection; } public boolean isRemoveTrailingDelim() { return removeTrailingDelim; } public void setRemoveTrailingDelim(boolean removeTrailingDelim) { this.removeTrailingDelim = removeTrailingDelim; } public void setHeader(boolean header) { this.header = header; } public String getDbpropfile() { return dbpropfile; } public void setDbpropfile(String dbpropfile) { this.dbpropfile = dbpropfile; } public String getMode() { return mode; } public void setMode(String mode) { if (!mode.equals("stop") && !mode.equals("exclude-field")&& !mode.equals("transaction")) mode = "exclude-row"; this.mode = mode; } public String getInFormat() { return inFormat; } public void setInFormat(String inFormat) { this.inFormat = inFormat; } public String getTargetTable() { return targetTable; } public void setTargetTable(String targetTable) { this.targetTable = targetTable; } public String getSrcFile() { return srcFile; } public void setSrcFile(String srcFile) { this.srcFile = srcFile; } public String getDelim() { return delim; } public void setDelim(String delim) { if (delim.equals("tab")) delim = "\t"; //Tab if (delim.equals("")) delim = "^"; //default Delimiter this.delim = delim; } public String getEncoding() { return encoding; } public void setEncoding(String encoding) { if(encoding==null || encoding.equals("")) encoding="UTF-8"; this.encoding = encoding; } public String getInserts() { return inserts; } public void setInserts(String inserts) { if(inserts.equalsIgnoreCase("batch")) useBatch=true; if(inserts.equalsIgnoreCase("simple")) useBatch=false; this.inserts = inserts; } public boolean isTruncateTargetTable() { return truncateTargetTable; } public void setTruncateTargetTable(boolean truncateTargetTable) { this.truncateTargetTable = truncateTargetTable; } public void setTruncateTargetTable(String truncateTargetTable) { if(truncateTargetTable!=null) {if(truncateTargetTable.equalsIgnoreCase("true")) this.truncateTargetTable =true; else this.truncateTargetTable =false; } else this.truncateTargetTable =false; } public int getReturnCode() { return returnCode; } public void setReturnCode(int returnCode) { this.returnCode = returnCode; } public boolean isContinueAfterError() { return continueAfterError; } public void setContinueAfterError(boolean continueAfterError) { this.continueAfterError = continueAfterError; } public String getXml_search_path() { return xml_search_path; } public void setXml_search_path(String xml_search_path) { this.xml_search_path = xml_search_path; } public long uploadFile() throws Exception { String protokoll=""; long numberOfRows=0; returnCode=0; try { //dbmd=uploadConnection.getMetaData(); String dbname=uploadConnection.getCatalog(); if(truncateTargetTable) { Statement stm=uploadConnection.createStatement(); stm.execute("delete from "+this.targetTable+";"); stm.close(); } if(inFormat.equalsIgnoreCase("xml")) { numberOfRows=uploadXML(); } else numberOfRows=uploadCSV(); } catch (Exception e) { returnCode=1; throw new Exception(e); } return numberOfRows; } private long uploadXML() throws Exception { String feld_wert; String errmsg=""; Document mydomres=null; numberOfRows=0; org.xml.sax.InputSource is; NodeList rowlist; mydomres = de.superx.elt.EtlUtils.buildDocumentFromXmlFile(srcFile); if(xml_search_path==null) { rowlist = mydomres.getElementsByTagName("row"); } else { //XPath xPath = XPathFactory.newInstance().newXPath(); File inputFile = new File(srcFile); //XPathFactory factory = XPathFactory.newInstance(); XPathFactory factory = new net.sf.saxon.xpath.XPathFactoryImpl(); XPath xPath = factory.newXPath(); //Document doc = builder.parse(inputFile); //doc.getDocumentElement().normalize(); rowlist=(NodeList) xPath.compile(xml_search_path).evaluate( mydomres, XPathConstants.NODESET); } Node rownode; initializeColumnSchema(); String insertHead=createPreparedStatementHead(); pst = uploadConnection.prepareStatement(insertHead); if(useBatch) pst.clearBatch(); int anz_rows = rowlist.getLength(); for (int zeilennr = 0; zeilennr < anz_rows; zeilennr++) { //Schleife über jede Zeile des XML-Stroms rownode = rowlist.item(zeilennr); //pst.clearParameters(); for(int col=0; col < numberOfColumns;col++) { for (Iterator it = XMLUtils.getChildNodeIterator(rownode); it.hasNext();) { Node fldNode = (Node) it.next(); //System.out.println(XMLUtils.getTheValue(fldNode)); if (XMLUtils.getAttribValue(fldNode,"name").equalsIgnoreCase(insert_cols[col])) { //int p; feld_wert=""; try{ feld_wert = XMLUtils.getTheValue(fldNode).trim(); feld_wert=StringUtils.replace(feld_wert, "CDATASTART", ""); } catch (IllegalArgumentException e) { //Node ist NULL, keine Warnung notwendig } errmsg = feld_wert_to_pst(zeilennr, col, errmsg, feld_wert); } //Wenn Feldname übereinstimmt } } //Ende der Schleife über die Spalten if(!errmsg.equals("") && mode.equals("stop")) { break; } if(useBatch) pst.addBatch(); else pst.executeUpdate(); numberOfRows++; } //Ende der Schleife über die Zeilen if(useBatch) pst.executeBatch(); return numberOfRows; } private long uploadCSV() throws Exception { String line; String line2; File outFile=null; String protokoll=""; long numberOfRows=0; if(isPostgres && !inserts.equalsIgnoreCase("simple") && !inserts.equalsIgnoreCase("batch")) { if(removeTrailingDelim) srcFile=removeTrailingDelim(srcFile); numberOfRows=uploadCSVinPostgres(srcFile,removeTrailingDelim); } else numberOfRows=uploadCSVwithAnsiSQL(srcFile); return numberOfRows; } private String removeTrailingDelim(String srcFile) throws UnsupportedEncodingException, FileNotFoundException, IOException { String line; File outFile; String returnSrcFile=srcFile+".tmp"; BufferedReader in2 = new BufferedReader(new InputStreamReader(new FileInputStream(srcFile), encoding)); outFile=new File(srcFile+".tmp"); FileOutputStream out = new FileOutputStream(outFile, false); PrintStream out2 = new PrintStream(out, true, encoding); while ((line = in2.readLine()) != null) { if (line.endsWith(delim)) line=line.substring(0,line.length()-delim.length()); out2.println(line); out2.flush(); } return returnSrcFile; } private long uploadCSVinPostgres(String srcFile, boolean deleteSrcFile) { long numOfRows=0; String copySql = "COPY " + targetTable + " FROM STDIN WITH DELIMITER '" + delim + "' NULL '' ENCODING '"+ encoding+"'"; copySql += header ? " HEADER" : ""; String srcFileContent=de.superx.elt.EtlUtils.getFileContentsWithEncoding(srcFile, encoding); String msg=""; try { //dbmd=uploadConnection.getMetaData(); String dbname=uploadConnection.getCatalog(); Statement stm=uploadConnection.createStatement(); int isIso=0; ResultSet rs=stm.executeQuery("SELECT distinct 1 FROM pg_catalog.pg_database where datname='"+dbname+"' and datctype ilike '%euro%' or datctype ilike '%1252%' or datctype ilike '%8859%';"); while (rs.next()) { if(rs.getObject(1)!=null) isIso= Integer.parseInt(rs.getObject(1).toString()); } rs.close(); stm.close(); Reader in4=null; final CopyManager cpm = ((PGConnection) uploadConnection).getCopyAPI(); long anz = 0; msg = ""; if(isIso==1) { String srcFileIso=srcFile+"_iso.tmp"; String srcFileContentValidIso = FileUtils.convertToIso(srcFileContent,"postgres") ;//new String(srcFileContent.getBytes("ISO-8859-1")); de.superx.elt.EtlUtils.saveFileContentsWithEncoding(srcFileIso, srcFileContentValidIso, "iso-8859-9"); FileInputStream fis = new FileInputStream(srcFileIso); in4 = new BufferedReader(new InputStreamReader(fis, "iso-8859-9")); } else { FileReader in3 = new FileReader(srcFile); in4 = new BufferedReader(in3); } numOfRows= cpm.copyIn(copySql, in4); numberOfRows =numOfRows; if(deleteSrcFile) { File outFile=new File(srcFile); if(outFile!=null) outFile.delete(); } } catch (Exception e) { // TODO Auto-generated catch block msg=e.toString(); } return numOfRows; } private long uploadCSVwithAnsiSQL(String srcFile) throws SQLException, FileNotFoundException, IOException { numberOfRows=0; String text; String text2; String msg=""; int zeilennr=1; int fehlerSaetze=0; BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(srcFile), encoding)); initializeColumnSchema(); String insertHead=createPreparedStatementHead(); pst = uploadConnection.prepareStatement(insertHead); if(useBatch) pst.clearBatch(); while ((text = in.readLine()) != null) { if (text.endsWith("\\")) { text=text.substring(0, text.length()-1); text2 = in.readLine(); if (text2 != null) { text += "\n"+ text2; while (text2.endsWith("\\")) { text=text.substring(0, text.length()-1); text2 = in.readLine(); if (text2 != null) text += "\n"+text2; } } } String prepare = createPreparedInsertStatement(zeilennr, insertHead, text); if(!prepare.equals("") && mode.equals("stop")) { msg=prepare; break; } if(useBatch) pst.addBatch(); else pst.executeUpdate(); numberOfRows++; } if(useBatch) pst.executeBatch(); //TODO: msg return numberOfRows; } private String createPreparedInsertStatement( int line, String insertHead, String text) throws SQLException { int p; int i=0; int k=0; String errmsg = ""; String feld_wert; //pst.clearParameters(); do { //ggf. Trennzeichen am Ende hinzufügen: if(!text.endsWith(delim)) text+= delim; p = text.indexOf(delim, i); //logger.config("Type "+types[k]); //maskierte Trennzeichen abfangen: if(p>0 && text.substring(p-1, p).equals("\\")) p = text.indexOf(delim, p+1); if (p > -1 ) { if(p==-1) feld_wert = text.substring(i); else feld_wert = text.substring(i, p); //wenn der Feldwert zufällig das Zeichen "\\n" enthält, wird es zu "\n" if(feld_wert != null && (feld_wert.indexOf("\\\\n") >0 )) { feld_wert=de.memtext.util.StringUtils.replace(feld_wert, "\\\\n", "\\n"); } //wenn der Feldwert das Zeichen "\Trennzeichen" enthält, wird der \ entfernt if(feld_wert != null && (feld_wert.indexOf("\\"+delim) >0 )) { feld_wert=de.memtext.util.StringUtils.replace(feld_wert, "\\", ""); } //wenn der Feldwert das Zeichen "\\" enthält, wird ein \ entfernt if(feld_wert != null && (feld_wert.indexOf("\\\\") >0 )) { feld_wert=de.memtext.util.StringUtils.replace(feld_wert, "\\\\", "\\"); } errmsg = feld_wert_to_pst(line,k, errmsg, feld_wert); k++; i = p + 1; } } while (p > -1); return errmsg; } private String feld_wert_to_pst(int line, int col, String errmsg, String feld_wert) throws SQLException { if( col >= numberOfColumns) errmsg+= "Anzahl Spalten in Datei ist "+col+", aber es sollten nur "+(numberOfColumns-1)+" Spalten sein. Bitte prüfen Sie das Trennzeichen"; else { if (feld_wert.equals("")) try { pst.setNull(col + 1, insert_types[col]); } catch (SQLException e1) { errmsg += e1.toString(); } else { switch (insert_types[col]) { case Types.BIGINT : case Types.TINYINT : case Types.SMALLINT : case Types.INTEGER : try { int myInt = (int) Integer.parseInt(feld_wert.trim()); pst.setInt(col + 1, myInt); } catch (NumberFormatException e1) { errmsg += e1.toString(); setFieldToNull(col, insert_types, pst); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert,e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.FLOAT : try { float myFloat = (float) Float.parseFloat(feld_wert.trim()); pst.setFloat(col + 1, myFloat); } catch (NumberFormatException e1) { errmsg += conversionException(line, col, feld_wert,e1.toString()); setFieldToNull(col, insert_types, pst); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert,e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.REAL : case Types.DOUBLE : case Types.NUMERIC : case Types.DECIMAL : try { double myDouble = (double) Double.parseDouble(feld_wert.trim()); pst.setDouble(col + 1, myDouble); } catch (NumberFormatException e1) { errmsg += conversionException(line, col, feld_wert,e1.toString()); setFieldToNull(col, insert_types, pst); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.CHAR : case Types.VARCHAR : default : if(feld_wert.equals(" ")) feld_wert=""; //Leerzeichen im UNL-File wird zu Leerstring try { pst.setString(col + 1, feld_wert); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert,e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.LONGVARCHAR : ByteArrayInputStream by = new ByteArrayInputStream(feld_wert.getBytes()); pst.setAsciiStream( col + 1, by, feld_wert.length()); break; case Types.DATE : try { java.util.Date datum = DateUtils.parse(feld_wert.trim()); feld_wert = DateUtils.formatUS(datum); //Leider ist dieser Schritt wg java.sql.Date nötig pst.setDate( col + 1, java.sql.Date.valueOf(feld_wert)); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } catch (ParseException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } catch (IllegalArgumentException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.TIME : try { //Time zeit = (java.sql.Time) //DateUtils.timeParse(feld_wert); pst.setTime(col + 1, java.sql.Time.valueOf( feld_wert.trim())); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } catch (IllegalArgumentException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.TIMESTAMP : try { java.util.Date datum = DateUtils.dateTimeParse(feld_wert.trim()); feld_wert = DateUtils.dateTimeFormatUS(datum); //Leider ist dieser Schritt wg java.sql.Date nötig pst.setTimestamp( col + 1, java.sql.Timestamp.valueOf( feld_wert + ".0")); } catch (SQLException e1) { errmsg += conversionException(line, col,feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } catch (ParseException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } catch (IllegalArgumentException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } break; case Types.BIT : // Types.BOOLEAN gibt es im jdk 1.3 nicht try { boolean wf = (boolean) Boolean.getBoolean(feld_wert.trim()); pst.setBoolean(col + 1, wf); } catch (SQLException e1) { errmsg += conversionException(line, col, feld_wert, e1.toString()); setFieldToNull(col, insert_types, pst); } //Boolean wird vom Informix-Treiber als OTHER (1111) erkannt //Da aber default '' ist, klappt es trotzdem break; } } } return errmsg; } private void setFieldToNull( int k, int[] insert_types, PreparedStatement pst) { if (mode.equals("exclude-field")) try { pst.setNull(k + 1, insert_types[k]); } catch (SQLException e3) { System.err.println("Invalid Field " + (k + 1) + " could not be set to null"); } } private String conversionException(int line,int col, String field_value, String error) { String err_msg = ""; err_msg = "Error in line "+line+" in Column " + (col + 1) + " "+insert_cols[col]+" value "+ field_value+ ": " + error.toString() + "; "; return err_msg; } private void initializeColumnSchema() throws SQLException { int i=0; ResultSet rs = null; ResultSetMetaData rsmd = null; String tabelle=targetTable; if (!dbmd.storesLowerCaseIdentifiers()) tabelle = tabelle.toUpperCase(); rs =dbmd.getColumns(uploadConnection.getCatalog(), null, tabelle, null); rsmd = rs.getMetaData(); while (rs.next()) { insert_cols[i] = rs.getObject("COLUMN_NAME").toString(); insert_types[i] = rs.getInt("DATA_TYPE"); i++; } numberOfColumns=i; if(!dbmd.supportsBatchUpdates()) useBatch=false; } private String createPreparedStatementHead() throws SQLException { String sql=null; String insert_head = "insert into " + targetTable+"("; String insert_val=""; for (int i = 0; i < numberOfColumns; i++) { insert_head += insert_cols[i] + ", "; insert_val+="?, "; } insert_head = insert_head.substring(0, insert_head.length() - 2); insert_val = insert_val.substring(0, insert_val.length() - 2); insert_head +=") values( "; sql=insert_head + insert_val+");"; return sql; } public Connection getConnection(Connection myConnection,String propfile) throws Exception { if(myConnection==null) { SxConnection mySxConnection = null; mySxConnection = new SxConnection(); mySxConnection.setPropfile(propfile); myConnection = mySxConnection.getConnection(); String db_driver = mySxConnection.m_DriverClass; if(db_driver.equals("org.postgresql.Driver")) isPostgres=true; } dbmd = myConnection.getMetaData(); return myConnection; } }