Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
metadata.isSigned(index), true);
}

public Schema getSchema(String typeName, int sqlType, int precision, int scale, String columnName,
boolean isSigned) throws SQLException {
return DBUtils.getSchema(typeName, sqlType, precision, scale, columnName, isSigned, true);
}

@Override
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
protected void setField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
Object o = DBUtils.transformValue(sqlType, sqlPrecision, sqlScale, resultSet, columnIndex);
if (o instanceof Date) {
setFieldValue(recordBuilder, field, o);
}

protected void setFieldValue(StructuredRecord.Builder recordBuilder, Schema.Field field, Object o)
throws SQLException {
if (o == null) {
recordBuilder.set(field.getName(), null);
} else if (o instanceof Date) {
recordBuilder.setDate(field.getName(), ((Date) o).toLocalDate());
} else if (o instanceof Time) {
recordBuilder.setTime(field.getName(), ((Time) o).toLocalTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLXML;
import java.sql.Struct;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
Expand All @@ -43,6 +48,8 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* Oracle Source implementation {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} and
Expand Down Expand Up @@ -106,13 +113,85 @@ record = recordBuilder.build();
@Override
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) {
handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
} else {
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
}
}

@Override
protected void setFieldValue(StructuredRecord.Builder recordBuilder, Schema.Field field, Object attrValue)
throws SQLException {
if (attrValue == null) {
recordBuilder.set(field.getName(), null);
return;
}

Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable()
: field.getSchema();
String attrClassName = attrValue.getClass().getName();
if (attrValue instanceof Struct) {
recordBuilder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema));
return;
}
if (attrValue instanceof Clob) {
Clob clob = (Clob) attrValue;
recordBuilder.set(field.getName(), clob.getSubString(1, (int) clob.length()));
return;
}
if (attrValue instanceof Blob) {
Blob blob = (Blob) attrValue;
recordBuilder.set(field.getName(), blob.getBytes(1, (int) blob.length()));
return;
}
if (attrValue instanceof SQLXML) {
recordBuilder.set(field.getName(), ((SQLXML) attrValue).getString());
return;
}
if ("oracle.sql.INTERVALDS".equals(attrClassName) || "oracle.sql.INTERVALYM".equals(attrClassName)) {
recordBuilder.set(field.getName(), attrValue.toString());
return;
}
if (attrValue instanceof BigDecimal) {
populateDecimalValue(attrValue, fieldSchema, recordBuilder, field);
return;
}

if (attrValue instanceof Timestamp) {
Timestamp timestamp = (Timestamp) attrValue;
if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
recordBuilder.setDateTime(field.getName(), timestamp.toLocalDateTime());
} else {
super.setFieldValue(recordBuilder, field, attrValue);
}
return;
}
if (attrValue instanceof OffsetDateTime) {
ZonedDateTime zonedDateTime = ((OffsetDateTime) attrValue).atZoneSameInstant(ZoneId.of("UTC"));

if (fieldSchema.getLogicalType() != null &&
(Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType()) ||
Schema.LogicalType.TIMESTAMP_MILLIS.equals(fieldSchema.getLogicalType()))) {
recordBuilder.setTimestamp(field.getName(), zonedDateTime);
} else {
recordBuilder.set(field.getName(), zonedDateTime.toString());
}
return;
}

ClassLoader oracleLoader = attrValue.getClass().getClassLoader();
try {
if (oracleLoader != null && oracleLoader.loadClass("oracle.jdbc.OracleBfile").isInstance(attrValue)) {
recordBuilder.set(field.getName(), getBfileBytes(attrValue, field.getName()));
return;
}
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
super.setFieldValue(recordBuilder, field, attrValue);
}

@Override
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String fieldName, int fieldIndex) throws SQLException {
Expand Down Expand Up @@ -169,6 +248,34 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
}
}

private void populateDecimalValue(Object attrValue, Schema fieldSchema,
StructuredRecord.Builder recordBuilder, Schema.Field field) {
BigDecimal bigDecimal = (BigDecimal) attrValue;
if (Schema.LogicalType.DECIMAL.equals(fieldSchema.getLogicalType())) {
recordBuilder.setDecimal(field.getName(), bigDecimal.setScale(fieldSchema.getScale(), RoundingMode.HALF_UP));
return;
}
switch (fieldSchema.getType()) {
case DOUBLE:
recordBuilder.set(field.getName(), bigDecimal.doubleValue());
break;
case FLOAT:
recordBuilder.set(field.getName(), bigDecimal.floatValue());
break;
case INT:
recordBuilder.set(field.getName(), bigDecimal.intValue());
break;
case LONG:
recordBuilder.set(field.getName(), bigDecimal.longValue());
break;
case STRING:
recordBuilder.set(field.getName(), bigDecimal.toString());
break;
default:
recordBuilder.set(field.getName(), bigDecimal);
}
}

/**
* Creates an instance of 'oracle.sql.TIMESTAMPTZ' which corresponds to the specified timestamp with time zone string.
* @param connection sql connection.
Expand Down Expand Up @@ -232,11 +339,15 @@ private Object createOracleTimestamp(Connection connection, String timestampStri
*/
private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLException {
Object bfile = resultSet.getObject(columnName);
return getBfileBytes(bfile, columnName);
}

public byte[] getBfileBytes(Object bfile, String columnName) {
if (bfile == null) {
return null;
}
try {
ClassLoader classLoader = resultSet.getClass().getClassLoader();
ClassLoader classLoader = bfile.getClass().getClassLoader();
Class<?> oracleBfileClass = classLoader.loadClass("oracle.jdbc.OracleBfile");
boolean isFileExist = (boolean) oracleBfileClass.getMethod("fileExists").invoke(bfile);
if (!isFileExist) {
Expand Down Expand Up @@ -341,6 +452,12 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
case OracleSourceSchemaReader.LONG_RAW:
recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex));
break;
case Types.STRUCT:
Struct structValue = (Struct) resultSet.getObject(columnIndex);
if (structValue != null) {
recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema));
}
break;
case Types.DECIMAL:
case Types.NUMERIC:
// This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER.
Expand Down Expand Up @@ -371,6 +488,57 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
}
}

/**
* Converts a JDBC {@link Struct} into a {@link StructuredRecord} based on the provided schema.
*
* @param struct the SQL structured type containing the source data attributes
* @param schema the target record schema defining the fields to map
* @return a populated {@code StructuredRecord} instance
* @throws SQLException if an error occurs reading the struct attributes or metadata
*/
protected StructuredRecord convertStructToRecord(Struct struct, Schema schema)
throws SQLException {
Map<String, Object> attributeMap = getAttributeMap(struct, schema);
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

for (Schema.Field field : schema.getFields()) {
Object attrValue = attributeMap.get(field.getName());
setFieldValue(builder, field, attrValue);
}
return builder.build();
}

/**
* Extracts attributes from a {@link Struct} into a case-insensitive map indexed by column name.
* Uses reflection to extract underlying metadata (e.g., from Oracle StructDescriptor).
*
* @param struct the source SQL structured type
* @param schema the target schema used for context in error messages
* @return a case-insensitive {@code Map} linking column names to their attribute values
* @throws SQLException if metadata extraction fails or driver-specific methods are inaccessible
*/
private Map<String, Object> getAttributeMap(Struct struct, Schema schema) throws SQLException {
Map<String, Object> attributeMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
Object[] attributes = struct.getAttributes();

try {
Object descriptor = struct.getClass().getMethod("getDescriptor").invoke(struct);
ResultSetMetaData metaData =
(ResultSetMetaData) descriptor.getClass().getMethod("getMetaData").invoke(descriptor);
for (int i = 1; i <= metaData.getColumnCount() && (i - 1) < attributes.length; i++) {
attributeMap.put(metaData.getColumnName(i), attributes[i - 1]);
}
} catch (SQLException | NoSuchMethodException e) {
throw new SQLException(String.format("Failed to retrieve attribute metadata for Oracle STRUCT schema '%s': %s",
schema.getRecordName(), e.getMessage()), e);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new SQLException(String.format("Unable to retrieve attribute metadata for Oracle STRUCT schema '%s'. "
+ "Ensure the Oracle JDBC driver supports JDBC StructDescriptor metadata.",
schema.getRecordName()), e);
}
return attributeMap;
}

/**
* Get the scale set in Non-nullable schema associated with the schema
* */
Expand Down
Loading
Loading