Daniel Quathamer
2 years ago
5 changed files with 883 additions and 6 deletions
@ -0,0 +1,779 @@
@@ -0,0 +1,779 @@
|
||||
/* |
||||
* de.superx.etl - a package for controlling ETL routines |
||||
* Copyright (C) 2021 Daniel Quathamer <danielq@memtext.de> |
||||
* |
||||
* This package is licensed under the CampusSource License; |
||||
* http://www.campussource.de/org/license/
|
||||
*/ |
||||
package de.superx.etl; |
||||
|
||||
import java.io.BufferedReader; |
||||
import java.io.BufferedWriter; |
||||
import java.io.ByteArrayInputStream; |
||||
import java.io.File; |
||||
import java.io.FileInputStream; |
||||
import java.io.FileNotFoundException; |
||||
import java.io.FileOutputStream; |
||||
import java.io.FileReader; |
||||
import java.io.FileWriter; |
||||
import java.io.IOException; |
||||
import java.io.InputStreamReader; |
||||
import java.io.OutputStreamWriter; |
||||
import java.io.PrintStream; |
||||
import java.io.Reader; |
||||
import java.io.UnsupportedEncodingException; |
||||
import java.nio.charset.CodingErrorAction; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.nio.file.Files; |
||||
import java.sql.Connection; |
||||
import java.sql.DatabaseMetaData; |
||||
import java.sql.PreparedStatement; |
||||
import java.sql.ResultSet; |
||||
import java.sql.ResultSetMetaData; |
||||
import java.sql.SQLException; |
||||
import java.sql.Statement; |
||||
import java.sql.Types; |
||||
import java.text.ParseException; |
||||
import java.util.Iterator; |
||||
import java.util.Properties; |
||||
|
||||
import de.superx.etl.util.FileUtils; |
||||
import de.superx.util.SqlStringUtils; |
||||
|
||||
import javax.xml.parsers.DocumentBuilder; |
||||
import javax.xml.parsers.DocumentBuilderFactory; |
||||
import javax.xml.parsers.ParserConfigurationException; |
||||
import javax.xml.xpath.XPathFactory; |
||||
import javax.xml.xpath.XPath; |
||||
import javax.xml.xpath.XPathConstants; |
||||
|
||||
import org.w3c.dom.Document; |
||||
import org.w3c.dom.Node; |
||||
import org.w3c.dom.NodeList; |
||||
import org.xml.sax.*; |
||||
|
||||
|
||||
import org.postgresql.PGConnection; |
||||
import org.postgresql.copy.CopyManager; |
||||
|
||||
import de.superx.bin.SxConnection; |
||||
import de.superx.bin.SxJdbcClient; |
||||
import de.superx.bin.UploadRecords; |
||||
import de.memtext.util.DateUtils; |
||||
import de.memtext.util.GetOpts; |
||||
import de.memtext.util.StringUtils; |
||||
import de.memtext.util.XMLUtils; |
||||
public class TableUploader { |
||||
private String logfile; |
||||
private String dbpropfile; |
||||
private String mode="stop"; |
||||
private String inFormat; |
||||
private String targetTable; |
||||
private String srcFile; |
||||
private boolean header=false; |
||||
|
||||
private String delim="^"; |
||||
private String encoding=SqlStringUtils.getEncoding(); |
||||
private String inserts=""; |
||||
private boolean truncateTargetTable=true; |
||||
private boolean continueAfterError; |
||||
private boolean removeTrailingDelim=true; |
||||
private boolean isPostgres; |
||||
private boolean useBatch=true; |
||||
private static int maxCols=1000; |
||||
private String[] insert_cols = new String[maxCols]; |
||||
private int[] insert_types = new int[maxCols]; |
||||
private int numberOfColumns; |
||||
public long numberOfRows; |
||||
private int returnCode; |
||||
private String xml_search_path; |
||||
private Connection uploadConnection; |
||||
private DatabaseMetaData dbmd; |
||||
private PreparedStatement pst; |
||||
|
||||
|
||||
public TableUploader() { |
||||
// TODO Auto-generated constructor stub
|
||||
|
||||
} |
||||
|
||||
public Connection getUploadConnection() { |
||||
return uploadConnection; |
||||
} |
||||
public void setUploadConnection(Connection uploadConnection) { |
||||
this.uploadConnection = uploadConnection; |
||||
} |
||||
public boolean isRemoveTrailingDelim() { |
||||
return removeTrailingDelim; |
||||
} |
||||
public void setRemoveTrailingDelim(boolean removeTrailingDelim) { |
||||
this.removeTrailingDelim = removeTrailingDelim; |
||||
} |
||||
public void setHeader(boolean header) { |
||||
this.header = header; |
||||
} |
||||
|
||||
|
||||
public String getDbpropfile() { |
||||
return dbpropfile; |
||||
} |
||||
public void setDbpropfile(String dbpropfile) { |
||||
this.dbpropfile = dbpropfile; |
||||
} |
||||
public String getMode() { |
||||
return mode; |
||||
} |
||||
public void setMode(String mode) { |
||||
if (!mode.equals("stop") && !mode.equals("exclude-field")&& !mode.equals("transaction")) |
||||
mode = "exclude-row"; |
||||
this.mode = mode; |
||||
} |
||||
public String getInFormat() { |
||||
return inFormat; |
||||
} |
||||
public void setInFormat(String inFormat) { |
||||
this.inFormat = inFormat; |
||||
} |
||||
public String getTargetTable() { |
||||
return targetTable; |
||||
} |
||||
public void setTargetTable(String targetTable) { |
||||
this.targetTable = targetTable; |
||||
} |
||||
public String getSrcFile() { |
||||
return srcFile; |
||||
} |
||||
public void setSrcFile(String srcFile) { |
||||
this.srcFile = srcFile; |
||||
} |
||||
|
||||
public String getDelim() { |
||||
return delim; |
||||
} |
||||
public void setDelim(String delim) { |
||||
if (delim.equals("tab")) |
||||
delim = "\t"; //Tab
|
||||
if (delim.equals("")) |
||||
delim = "^"; //default Delimiter
|
||||
this.delim = delim; |
||||
} |
||||
public String getEncoding() { |
||||
return encoding; |
||||
} |
||||
public void setEncoding(String encoding) { |
||||
if(encoding==null || encoding.equals("")) |
||||
encoding="UTF-8"; |
||||
this.encoding = encoding; |
||||
} |
||||
public String getInserts() { |
||||
return inserts; |
||||
} |
||||
public void setInserts(String inserts) { |
||||
if(inserts.equalsIgnoreCase("batch")) |
||||
useBatch=true; |
||||
if(inserts.equalsIgnoreCase("simple")) |
||||
useBatch=false; |
||||
|
||||
this.inserts = inserts; |
||||
} |
||||
public boolean isTruncateTargetTable() { |
||||
return truncateTargetTable; |
||||
} |
||||
|
||||
public void setTruncateTargetTable(boolean truncateTargetTable) { |
||||
this.truncateTargetTable = truncateTargetTable; |
||||
} |
||||
public void setTruncateTargetTable(String truncateTargetTable) { |
||||
if(truncateTargetTable!=null) |
||||
{if(truncateTargetTable.equalsIgnoreCase("true")) |
||||
this.truncateTargetTable =true; |
||||
else |
||||
this.truncateTargetTable =false; |
||||
} |
||||
else |
||||
this.truncateTargetTable =false; |
||||
} |
||||
|
||||
public int getReturnCode() { |
||||
return returnCode; |
||||
} |
||||
|
||||
public void setReturnCode(int returnCode) { |
||||
this.returnCode = returnCode; |
||||
} |
||||
|
||||
public boolean isContinueAfterError() { |
||||
return continueAfterError; |
||||
} |
||||
public void setContinueAfterError(boolean continueAfterError) { |
||||
this.continueAfterError = continueAfterError; |
||||
} |
||||
public String getXml_search_path() { |
||||
return xml_search_path; |
||||
} |
||||
|
||||
public void setXml_search_path(String xml_search_path) { |
||||
this.xml_search_path = xml_search_path; |
||||
} |
||||
|
||||
public long uploadFile() throws Exception |
||||
{ |
||||
String protokoll=""; |
||||
long numberOfRows=0; |
||||
returnCode=0; |
||||
try { |
||||
//dbmd=uploadConnection.getMetaData();
|
||||
String dbname=uploadConnection.getCatalog(); |
||||
if(truncateTargetTable) |
||||
{ |
||||
Statement stm=uploadConnection.createStatement(); |
||||
stm.execute("delete from "+this.targetTable+";"); |
||||
stm.close(); |
||||
} |
||||
if(inFormat.equalsIgnoreCase("xml")) |
||||
{ |
||||
numberOfRows=uploadXML(); |
||||
} |
||||
else |
||||
numberOfRows=uploadCSV(); |
||||
} catch (Exception e) { |
||||
returnCode=1; |
||||
throw new Exception(e); |
||||
} |
||||
return numberOfRows; |
||||
|
||||
} |
||||
private long uploadXML() throws Exception |
||||
{ |
||||
String feld_wert; |
||||
String errmsg=""; |
||||
Document mydomres=null; |
||||
numberOfRows=0; |
||||
org.xml.sax.InputSource is; |
||||
NodeList rowlist; |
||||
|
||||
mydomres = de.superx.etl.EtlUtils.buildDocumentFromXmlFile(srcFile); |
||||
|
||||
if(xml_search_path==null) |
||||
{ |
||||
rowlist = mydomres.getElementsByTagName("row"); |
||||
} |
||||
else |
||||
|
||||
{ |
||||
//XPath xPath = XPathFactory.newInstance().newXPath();
|
||||
File inputFile = new File(srcFile); |
||||
//XPathFactory factory = XPathFactory.newInstance();
|
||||
XPathFactory factory = new net.sf.saxon.xpath.XPathFactoryImpl(); |
||||
|
||||
XPath xPath = factory.newXPath(); |
||||
//Document doc = builder.parse(inputFile);
|
||||
//doc.getDocumentElement().normalize();
|
||||
rowlist=(NodeList) xPath.compile(xml_search_path).evaluate( |
||||
mydomres, XPathConstants.NODESET); |
||||
} |
||||
Node rownode; |
||||
initializeColumnSchema(); |
||||
String insertHead=createPreparedStatementHead(); |
||||
pst = uploadConnection.prepareStatement(insertHead); |
||||
if(useBatch) |
||||
pst.clearBatch(); |
||||
int anz_rows = rowlist.getLength(); |
||||
for (int zeilennr = 0; zeilennr < anz_rows; zeilennr++) { |
||||
//Schleife über jede Zeile des XML-Stroms
|
||||
rownode = rowlist.item(zeilennr); |
||||
//pst.clearParameters();
|
||||
|
||||
for(int col=0; col < numberOfColumns;col++) |
||||
{ |
||||
for (Iterator it = XMLUtils.getChildNodeIterator(rownode); it.hasNext();) { |
||||
Node fldNode = (Node) it.next(); |
||||
//System.out.println(XMLUtils.getTheValue(fldNode));
|
||||
if (XMLUtils.getAttribValue(fldNode,"name").equalsIgnoreCase(insert_cols[col])) { |
||||
//int p;
|
||||
feld_wert=""; |
||||
try{ |
||||
feld_wert = XMLUtils.getTheValue(fldNode).trim(); |
||||
feld_wert=StringUtils.replace(feld_wert, |
||||
"CDATASTART", "<![CDATA["); |
||||
feld_wert = StringUtils.replace(feld_wert, |
||||
"CDATAEND", "]]>"); |
||||
|
||||
|
||||
} |
||||
catch (IllegalArgumentException e) |
||||
{ |
||||
//Node ist NULL, keine Warnung notwendig
|
||||
} |
||||
|
||||
errmsg = feld_wert_to_pst(zeilennr, col, errmsg, feld_wert); |
||||
|
||||
} //Wenn Feldname übereinstimmt
|
||||
} |
||||
} //Ende der Schleife über die Spalten
|
||||
if(!errmsg.equals("") && mode.equals("stop")) |
||||
{ |
||||
break; |
||||
} |
||||
if(useBatch) |
||||
pst.addBatch(); |
||||
else |
||||
pst.executeUpdate(); |
||||
|
||||
numberOfRows++; |
||||
|
||||
} //Ende der Schleife über die Zeilen
|
||||
if(useBatch) |
||||
pst.executeBatch(); |
||||
return numberOfRows; |
||||
} |
||||
|
||||
|
||||
|
||||
private long uploadCSV() throws Exception |
||||
{ |
||||
String line; |
||||
String line2; |
||||
File outFile=null; |
||||
String protokoll=""; |
||||
long numberOfRows=0; |
||||
if(isPostgres && !inserts.equalsIgnoreCase("simple") && !inserts.equalsIgnoreCase("batch")) |
||||
{ |
||||
if(removeTrailingDelim) |
||||
srcFile=removeTrailingDelim(srcFile); |
||||
numberOfRows=uploadCSVinPostgres(srcFile,removeTrailingDelim); |
||||
} |
||||
else |
||||
numberOfRows=uploadCSVwithAnsiSQL(srcFile); |
||||
return numberOfRows; |
||||
|
||||
|
||||
} |
||||
private String removeTrailingDelim(String srcFile) throws UnsupportedEncodingException, FileNotFoundException, IOException { |
||||
String line; |
||||
File outFile; |
||||
String returnSrcFile=srcFile+".tmp"; |
||||
BufferedReader in2 = new BufferedReader(new InputStreamReader(new FileInputStream(srcFile), encoding)); |
||||
|
||||
outFile=new File(srcFile+".tmp"); |
||||
FileOutputStream out = new FileOutputStream(outFile, false); |
||||
PrintStream out2 = new PrintStream(out, true, encoding); |
||||
|
||||
|
||||
while ((line = in2.readLine()) != null) { |
||||
|
||||
if (line.endsWith(delim)) |
||||
line=line.substring(0,line.length()-delim.length()); |
||||
out2.println(line); |
||||
out2.flush(); |
||||
|
||||
} |
||||
return returnSrcFile; |
||||
} |
||||
private long uploadCSVinPostgres(String srcFile, boolean deleteSrcFile) { |
||||
long numOfRows=0; |
||||
String copySql = "COPY " + targetTable + " FROM STDIN WITH DELIMITER '" + delim + "' NULL '' ENCODING '"+ encoding+"'"; |
||||
copySql += header ? " HEADER" : ""; |
||||
|
||||
String srcFileContent=de.superx.etl.EtlUtils.getFileContentsWithEncoding(srcFile, encoding); |
||||
String msg=""; |
||||
try { |
||||
//dbmd=uploadConnection.getMetaData();
|
||||
String dbname=uploadConnection.getCatalog(); |
||||
Statement stm=uploadConnection.createStatement(); |
||||
int isIso=0; |
||||
ResultSet rs=stm.executeQuery("SELECT distinct 1 FROM pg_catalog.pg_database where datname='"+dbname+"' and datctype ilike '%euro%' or datctype ilike '%1252%' or datctype ilike '%8859%';"); |
||||
while (rs.next()) { |
||||
if(rs.getObject(1)!=null) |
||||
isIso= Integer.parseInt(rs.getObject(1).toString()); |
||||
|
||||
} |
||||
rs.close(); |
||||
stm.close(); |
||||
Reader in4=null; |
||||
final CopyManager cpm = ((PGConnection) uploadConnection).getCopyAPI(); |
||||
long anz = 0; |
||||
msg = ""; |
||||
|
||||
if(isIso==1) |
||||
{ |
||||
String srcFileIso=srcFile+"_iso.tmp"; |
||||
String srcFileContentValidIso = FileUtils.convertToIso(srcFileContent,"postgres") ;//new String(srcFileContent.getBytes("ISO-8859-1"));
|
||||
de.superx.etl.EtlUtils.saveFileContentsWithEncoding(srcFileIso, srcFileContentValidIso, "iso-8859-9"); |
||||
FileInputStream fis = new FileInputStream(srcFileIso); |
||||
in4 = new BufferedReader(new InputStreamReader(fis, "iso-8859-9")); |
||||
|
||||
} |
||||
else |
||||
{ |
||||
FileReader in3 = new FileReader(srcFile); |
||||
in4 = new BufferedReader(in3); |
||||
|
||||
} |
||||
|
||||
numOfRows= cpm.copyIn(copySql, in4); |
||||
numberOfRows =numOfRows; |
||||
if(deleteSrcFile) |
||||
{ |
||||
File outFile=new File(srcFile); |
||||
if(outFile!=null) |
||||
outFile.delete(); |
||||
} |
||||
} catch (Exception e) { |
||||
// TODO Auto-generated catch block
|
||||
msg=e.toString(); |
||||
} |
||||
return numOfRows; |
||||
} |
||||
|
||||
private long uploadCSVwithAnsiSQL(String srcFile) throws SQLException, FileNotFoundException, IOException { |
||||
numberOfRows=0; |
||||
String text; |
||||
String text2; |
||||
String msg=""; |
||||
int zeilennr=1; |
||||
int fehlerSaetze=0; |
||||
|
||||
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(srcFile), encoding)); |
||||
initializeColumnSchema(); |
||||
String insertHead=createPreparedStatementHead(); |
||||
pst = uploadConnection.prepareStatement(insertHead); |
||||
if(useBatch) |
||||
pst.clearBatch(); |
||||
|
||||
while ((text = in.readLine()) != null) { |
||||
if (text.endsWith("\\")) { |
||||
text=text.substring(0, text.length()-1); |
||||
text2 = in.readLine(); |
||||
if (text2 != null) { |
||||
text += "\n"+ text2; |
||||
while (text2.endsWith("\\")) { |
||||
text=text.substring(0, text.length()-1); |
||||
text2 = in.readLine(); |
||||
if (text2 != null) |
||||
text += "\n"+text2; |
||||
|
||||
} |
||||
} |
||||
} |
||||
|
||||
String prepare = |
||||
createPreparedInsertStatement(zeilennr, |
||||
insertHead, |
||||
text); |
||||
if(!prepare.equals("") && mode.equals("stop")) |
||||
{ |
||||
msg=prepare; |
||||
break; |
||||
} |
||||
if(useBatch) |
||||
pst.addBatch(); |
||||
else |
||||
pst.executeUpdate(); |
||||
numberOfRows++; |
||||
|
||||
|
||||
|
||||
} |
||||
if(useBatch) |
||||
pst.executeBatch(); |
||||
//TODO: msg
|
||||
return numberOfRows; |
||||
} |
||||
|
||||
private String createPreparedInsertStatement( |
||||
int line, |
||||
String insertHead, |
||||
String text) |
||||
throws SQLException { |
||||
int p; |
||||
int i=0; |
||||
int k=0; |
||||
String errmsg = ""; |
||||
String feld_wert; |
||||
//pst.clearParameters();
|
||||
do { |
||||
//ggf. Trennzeichen am Ende hinzufügen:
|
||||
if(!text.endsWith(delim)) |
||||
text+= delim; |
||||
p = text.indexOf(delim, i); |
||||
//logger.config("Type "+types[k]);
|
||||
//maskierte Trennzeichen abfangen:
|
||||
if(p>0 && text.substring(p-1, p).equals("\\")) |
||||
p = text.indexOf(delim, p+1); |
||||
|
||||
if (p > -1 ) { |
||||
if(p==-1) |
||||
feld_wert = text.substring(i); |
||||
else |
||||
feld_wert = text.substring(i, p); |
||||
//wenn der Feldwert zufällig das Zeichen "\\n" enthält, wird es zu "\n"
|
||||
if(feld_wert != null && (feld_wert.indexOf("\\\\n") >0 )) |
||||
{ |
||||
feld_wert=de.memtext.util.StringUtils.replace(feld_wert, "\\\\n", "\\n"); |
||||
} |
||||
//wenn der Feldwert das Zeichen "\Trennzeichen" enthält, wird der \ entfernt
|
||||
if(feld_wert != null && (feld_wert.indexOf("\\"+delim) >0 )) |
||||
{ |
||||
feld_wert=de.memtext.util.StringUtils.replace(feld_wert, "\\", ""); |
||||
} |
||||
//wenn der Feldwert das Zeichen "\\" enthält, wird ein \ entfernt
|
||||
if(feld_wert != null && (feld_wert.indexOf("\\\\") >0 )) |
||||
{ |
||||
feld_wert=de.memtext.util.StringUtils.replace(feld_wert, "\\\\", "\\"); |
||||
} |
||||
|
||||
errmsg = feld_wert_to_pst(line,k, errmsg, feld_wert); |
||||
k++; |
||||
i = p + 1; |
||||
} |
||||
|
||||
} while (p > -1); |
||||
return errmsg; |
||||
} |
||||
private String feld_wert_to_pst(int line, int col, String errmsg, String feld_wert) throws SQLException { |
||||
|
||||
|
||||
if( col >= numberOfColumns) |
||||
errmsg+= "Anzahl Spalten in Datei ist "+col+", aber es sollten nur "+(numberOfColumns-1)+" Spalten sein. Bitte prüfen Sie das Trennzeichen"; |
||||
else |
||||
{ |
||||
if (feld_wert.equals("")) |
||||
try { |
||||
pst.setNull(col + 1, insert_types[col]); |
||||
} catch (SQLException e1) { |
||||
errmsg += e1.toString(); |
||||
} else { |
||||
|
||||
switch (insert_types[col]) { |
||||
case Types.BIGINT : |
||||
case Types.TINYINT : |
||||
case Types.SMALLINT : |
||||
case Types.INTEGER : |
||||
|
||||
try { |
||||
int myInt = (int) Integer.parseInt(feld_wert.trim()); |
||||
pst.setInt(col + 1, myInt); |
||||
} catch (NumberFormatException e1) { |
||||
errmsg += e1.toString(); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert,e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
break; |
||||
case Types.FLOAT : |
||||
try { |
||||
float myFloat = |
||||
(float) Float.parseFloat(feld_wert.trim()); |
||||
pst.setFloat(col + 1, myFloat); |
||||
} catch (NumberFormatException e1) { |
||||
errmsg += conversionException(line, col, feld_wert,e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert,e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
break; |
||||
|
||||
case Types.REAL : |
||||
case Types.DOUBLE : |
||||
case Types.NUMERIC : |
||||
case Types.DECIMAL : |
||||
try { |
||||
double myDouble = |
||||
(double) Double.parseDouble(feld_wert.trim()); |
||||
pst.setDouble(col + 1, myDouble); |
||||
} catch (NumberFormatException e1) { |
||||
errmsg += conversionException(line, col, feld_wert,e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
break; |
||||
|
||||
case Types.CHAR : |
||||
case Types.VARCHAR : |
||||
default : |
||||
if(feld_wert.equals(" ")) |
||||
feld_wert=""; //Leerzeichen im UNL-File wird zu Leerstring
|
||||
try { |
||||
pst.setString(col + 1, feld_wert); |
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert,e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
break; |
||||
case Types.LONGVARCHAR : |
||||
ByteArrayInputStream by = |
||||
new ByteArrayInputStream(feld_wert.getBytes()); |
||||
pst.setAsciiStream( |
||||
col + 1, |
||||
by, |
||||
feld_wert.length()); |
||||
break; |
||||
case Types.DATE : |
||||
try { |
||||
java.util.Date datum = |
||||
DateUtils.parse(feld_wert.trim()); |
||||
feld_wert = DateUtils.formatUS(datum); |
||||
//Leider ist dieser Schritt wg java.sql.Date nötig
|
||||
pst.setDate( |
||||
col + 1, |
||||
java.sql.Date.valueOf(feld_wert)); |
||||
|
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} catch (ParseException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
catch (IllegalArgumentException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
|
||||
break; |
||||
case Types.TIME : |
||||
|
||||
try { |
||||
//Time zeit = (java.sql.Time)
|
||||
//DateUtils.timeParse(feld_wert);
|
||||
pst.setTime(col + 1, java.sql.Time.valueOf( |
||||
feld_wert.trim())); |
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
catch (IllegalArgumentException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
|
||||
break; |
||||
case Types.TIMESTAMP : |
||||
try { |
||||
java.util.Date datum = |
||||
DateUtils.dateTimeParse(feld_wert.trim()); |
||||
feld_wert = DateUtils.dateTimeFormatUS(datum); |
||||
//Leider ist dieser Schritt wg java.sql.Date nötig
|
||||
pst.setTimestamp( |
||||
col + 1, |
||||
java.sql.Timestamp.valueOf( |
||||
feld_wert + ".0")); |
||||
|
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col,feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} catch (ParseException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
catch (IllegalArgumentException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
|
||||
break; |
||||
|
||||
case Types.BIT : |
||||
// Types.BOOLEAN gibt es im jdk 1.3 nicht
|
||||
try { |
||||
boolean wf = |
||||
(boolean) Boolean.getBoolean(feld_wert.trim()); |
||||
pst.setBoolean(col + 1, wf); |
||||
} catch (SQLException e1) { |
||||
errmsg += conversionException(line, col, feld_wert, e1.toString()); |
||||
setFieldToNull(col, insert_types, pst); |
||||
} |
||||
//Boolean wird vom Informix-Treiber als OTHER (1111) erkannt
|
||||
//Da aber default '' ist, klappt es trotzdem
|
||||
break; |
||||
} |
||||
|
||||
} |
||||
} |
||||
return errmsg; |
||||
} |
||||
private void setFieldToNull( |
||||
int k, |
||||
int[] insert_types, |
||||
PreparedStatement pst) { |
||||
if (mode.equals("exclude-field")) |
||||
try { |
||||
pst.setNull(k + 1, insert_types[k]); |
||||
} catch (SQLException e3) { |
||||
System.err.println("Invalid Field " + (k + 1) + " could not be set to null"); |
||||
} |
||||
|
||||
} |
||||
private String conversionException(int line,int col, String field_value, String error) { |
||||
String err_msg = ""; |
||||
|
||||
err_msg = "Error in line "+line+" in Column " + (col + 1) + " "+insert_cols[col]+" value "+ field_value+ ": " + error.toString() + "; "; |
||||
|
||||
return err_msg; |
||||
} |
||||
private void initializeColumnSchema() throws SQLException |
||||
{ |
||||
int i=0; |
||||
|
||||
ResultSet rs = null; |
||||
ResultSetMetaData rsmd = null; |
||||
String tabelle=targetTable; |
||||
if (!dbmd.storesLowerCaseIdentifiers()) |
||||
tabelle = tabelle.toUpperCase(); |
||||
rs =dbmd.getColumns(uploadConnection.getCatalog(), null, tabelle, null); |
||||
rsmd = rs.getMetaData(); |
||||
while (rs.next()) { |
||||
insert_cols[i] = rs.getObject("COLUMN_NAME").toString(); |
||||
insert_types[i] = rs.getInt("DATA_TYPE"); |
||||
i++; |
||||
} |
||||
numberOfColumns=i; |
||||
if(!dbmd.supportsBatchUpdates()) |
||||
useBatch=false; |
||||
|
||||
} |
||||
private String createPreparedStatementHead() throws SQLException |
||||
{ |
||||
|
||||
String sql=null; |
||||
|
||||
String insert_head = "insert into " + targetTable+"("; |
||||
String insert_val=""; |
||||
for (int i = 0; i < numberOfColumns; i++) |
||||
{ |
||||
insert_head += insert_cols[i] + ", "; |
||||
insert_val+="?, "; |
||||
} |
||||
insert_head = insert_head.substring(0, insert_head.length() - 2); |
||||
insert_val = insert_val.substring(0, insert_val.length() - 2); |
||||
insert_head +=") values( "; |
||||
sql=insert_head + insert_val+");"; |
||||
return sql; |
||||
|
||||
} |
||||
public Connection getConnection(Connection myConnection,String propfile) throws Exception { |
||||
|
||||
if(myConnection==null) |
||||
{ |
||||
SxConnection mySxConnection = null; |
||||
mySxConnection = new SxConnection(); |
||||
mySxConnection.setPropfile(propfile); |
||||
|
||||
myConnection = mySxConnection.getConnection(); |
||||
|
||||
String db_driver = mySxConnection.m_DriverClass; |
||||
if(db_driver.equals("org.postgresql.Driver")) |
||||
isPostgres=true; |
||||
} |
||||
dbmd = myConnection.getMetaData(); |
||||
|
||||
return myConnection; |
||||
|
||||
} |
||||
|
||||
} |
@ -0,0 +1,98 @@
@@ -0,0 +1,98 @@
|
||||
package de.superx.etl.util; |
||||
|
||||
import java.io.BufferedReader; |
||||
import java.io.BufferedWriter; |
||||
import java.io.File; |
||||
import java.io.FileInputStream; |
||||
import java.io.FileNotFoundException; |
||||
import java.io.FileOutputStream; |
||||
import java.io.IOException; |
||||
import java.io.InputStreamReader; |
||||
import java.io.OutputStreamWriter; |
||||
import java.io.StringWriter; |
||||
import java.io.UnsupportedEncodingException; |
||||
|
||||
public class FileUtils { |
||||
/* |
||||
* Liest eine Datei mit vorgegebenem Encoding ein in einen String. |
||||
* Wenn Datei nicht existiert, wird NULL zurückgegeben, |
||||
* Wenn sie leer ist, kommt Leerstring zurück |
||||
* @author Quathamer |
||||
* @param filePath |
||||
* @param encoding (Default ist utf-8) |
||||
*/ |
||||
|
||||
public static String getFileContentsWithEncoding(String filePath, String encoding) { |
||||
File f = new File(filePath); |
||||
if (!f.exists()) { |
||||
System.out.println("Fehler: Datei " + filePath + " existiert nicht."); |
||||
return null; |
||||
} |
||||
String fileContents = ""; |
||||
if (encoding == null || encoding.trim().equals("")) { |
||||
encoding = System.getProperty("file.encoding"); |
||||
} |
||||
try { |
||||
// --- IputStream und OutputStream generieren ---//
|
||||
FileInputStream fis = new FileInputStream(f); |
||||
// Wenn Quelldatei Unicode, dann speziellen Reader nutzen
|
||||
BufferedReader in; |
||||
//BufferedReader ist schneller bei großen Dateien
|
||||
in = new BufferedReader(new InputStreamReader(fis, encoding)); |
||||
// --- Output-Stream der temporären Datei erzeugen ---//
|
||||
StringWriter out = new StringWriter(); |
||||
// --- Verarbeiten der Datei ---//
|
||||
String text; |
||||
text = in.readLine(); |
||||
while (text != null) { // Datei nicht leer
|
||||
out.write(text); |
||||
out.write(System.getProperty("line.separator")); |
||||
text = in.readLine(); |
||||
} |
||||
if (!(out == null)) { |
||||
fileContents = out.toString(); |
||||
} |
||||
} catch (FileNotFoundException e) { |
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace(); |
||||
} catch (UnsupportedEncodingException e) { |
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace(); |
||||
} catch (IOException e) { |
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace(); |
||||
} |
||||
return fileContents; |
||||
} |
||||
|
||||
public static void saveFileContentsWithEncoding(String filename, String contents, String encoding) throws |
||||
|
||||
FileNotFoundException, |
||||
IOException |
||||
{ |
||||
|
||||
|
||||
File f = new File(filename); |
||||
BufferedReader in; |
||||
BufferedWriter out; |
||||
|
||||
//Default encoding ist utf-8
|
||||
if (encoding == null) encoding = System.getProperty("file.encoding"); |
||||
// --- Output-Stream der temporären Datei erzeugen ---//
|
||||
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f), encoding)); |
||||
|
||||
out.write(contents); |
||||
|
||||
out.close(); |
||||
|
||||
|
||||
}//Ende der Methode
|
||||
public static String convertToIso(String inp, String dbsystem) throws java.io.UnsupportedEncodingException |
||||
{ |
||||
String outp = new String(inp.getBytes("ISO-8859-1")); |
||||
if (dbsystem.equalsIgnoreCase("postgres")) { |
||||
outp = outp.replaceAll("´", " "); |
||||
} |
||||
return outp; |
||||
} |
||||
} |
Loading…
Reference in new issue