Oracle to Kafka made easy.
Sometimes we need to expose data from Oracle database to external systems/services.
In our experience, it ends ups with kind of interim tables or even interim databases or REST service solutions, the downsides of such approaches are well known.
These days one of the best solution to communicate services is message broker, namely Apache Kafka.
This time we would like to send data from Oracle database to Kafka, after some googling we come up with – GoldenGate, WebLogic, bunch of connectors and etc… Oh my… all of them proprietary, overcomplicated and heavy, but we just need to capture changes in one table via database trigger, then send them to Kafka.
But it is not (yet?) implemented in native pl/sql, creating REST service on the other hand requires additional “moving parts” and potentially slow.
As we know, communicate to Kafka from Java code is dead simple, so we decided to create our API in Java then load Java code into Oracle stored procedure.
Below you can find the full description of the solution along with source code.
As result we got an a fully functional pl/sql API allowing us easily send data from Oracle database to Apache Kafka without using heavy and complex proprietary software, easier and potentially way faster comparable to custom REST service.
In this article we will show how to publish messages from Oracle Database 12+ to Apache Kafka brokers.
Oracle Database to Apache Kafka integration can be achieved using many methods. In this article we explain how to do this using internal Oracle Aurora JVM. Target audience for this type of integration are sites with active usage of PL/SQL.
I. Installation
I.1 Oracle Database preparation
If you still using Oracle Database 12.1.x you need to configure it to use JDK7. To do that:
perl $ORACLE_HOME/javavm/install/update_javavm_binaries.pl 7
cd $ORACLE_HOME/rdbms/lib
make -f ins_rdbms.mk ioracle
After that for existing instance you need to perform JDK upgrade using $ORACLE_HOME/javavm/install/update_javavm_db.sql script:
$ORACLE_HOME/perl/bin/perl -I $ORACLE_HOME/rdbms/admin $ORACLE_HOME/rdbms/admin/catcon.pl -b jvm_component_upgrade.log $ORACLE_HOME/javavm/install/update_javavm_db.sql
or create new instance using DBCA utility or set of scripts if you are old school Oracle DBA.
You can check version of Oracle Aurora JVM using
select dbms_java.get_jdk_version from dual;
No additional steps are required for Oracle Database 12.2+ including 18c.
I.2 Schema owner setup
Additional Java security permissions required to run Kafka producer code in Oracle Database. We use name APPS (Oracle Applications/Oracle E-Business Suite from 1998):
create user apps identified by apps
default tablespace users
temporary tablespace temp
quota unlimited on users;
grant CONNECT,RESOURCE,JAVAUSERPRIV to APPS;
exec dbms_java.grant_permission( 'APPS', 'SYS:oracle.aurora.security.JServerPermission', 'Verifier', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:java.lang.RuntimePermission', 'getClassLoader', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanTrustPermission', 'register', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanServerPermission', 'createMBeanServer', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*', 'registerMBean');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*', 'unregisterMBean');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.utils.AppInfoParser$AppInfo#-*', 'registerMBean');
I.3 Loading of Kafka classes
We use Apache Kafka distribution as source of required libraries.
I.3.A Oracle Database 12.1
Download Apache Kafka 1.1.1 from https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz
We recommend this version for Oracle Database 12.1.x, untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/lz4-java-1.4.1.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/log4j-1.2.17.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-api-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-log4j12-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/kafka-clients-1.1.1.jar');
I.3.B Oracle Database 12.2+
Download Apache Kafka 2.1.0 from https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz
(at moment of writing - latest stable Kafka version).
untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/lz4-java-1.5.0.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/log4j-1.2.17.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-api-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-log4j12-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-core-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-annotations-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-databind-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/kafka-clients-2.1.0.jar');
For any Oracle Database version check for invalid objects:
select count(*) from user_objects where status<>'VALID';
If they exist recompile it using:
exec utl_recomp.recomp_serial;
or
exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);
I.4 Kafka wrapper installation
Download or clone https://github.com/averemee-si/aurora-kafka
Start SQL*Plus and run as schema owner created on Step I.2 downloaded scripts in following order:
@KafkaUtils.sql
@KafkaUtilsJava.pkg
@KafkaUtils.pkg
This will create object type A2_TUPLE which simulates key-value pair, table type A2_ARRAY_OF_TUPLES - array of A2_TUPLE, Java source named eu.solutions.a2.aurora.kafka.KafkaUtils and PL/SQL package A2_KAFKA_UTILS.
Do not forget to check for invalid objects:
select count(*) from user_objects where status<>'VALID';
If they exist recompile using:
exec utl_recomp.recomp_serial;
or
exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);
II. Usage example
Typical usage is in creation of producer with call to A2_KAFKA_UTILS.CREATE_PRODUCER then sending messages with A2_KAFKA_UTILS.SEND_JSON_MESSAGE or A2_KAFKA_UTILS.SEND_STRING_MESSAGE and then closing producer with A2_KAFKA_UTILS.CLOSE_PRODUCER.
declare
OP_RESULT varchar2(2000);
KAFKA_PROPS A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();
MSG_2_SEND A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();
begin
-- Set mandatory Kafka producer properties
KAFKA_PROPS.extend(4);
KAFKA_PROPS(1) := A2_TUPLE('bootstrap.servers', 'kafka1.a2-solutions.eu:9092');
KAFKA_PROPS(2) := A2_TUPLE('client.id', 'KafkaOracleAuroraProducer');
KAFKA_PROPS(3) := A2_TUPLE('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
KAFKA_PROPS(4) := A2_TUPLE('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
-- Create producer with name TestProducer
OP_RESULT := A2_KAFKA_UTILS.CREATE_PRODUCER('TestProducer', KAFKA_PROPS);
if ('SUCCESS' = OP_RESULT) then
begin
MSG_2_SEND.extend(3);
MSG_2_SEND(1) := A2_TUPLE('INVOICE_ID', '13474');
MSG_2_SEND(2) := A2_TUPLE('INVOICE_NUM', 'AP-2019-13474');
MSG_2_SEND(3) := A2_TUPLE('INVOICE_AMOUNT', '2000');
OP_RESULT := A2_KAFKA_UTILS.SEND_JSON_MESSAGE('TestProducer','TestTopic','Test-Message-Key',MSG_2_SEND);
if ('SUCCESS' = OP_RESULT) then
OP_RESULT := A2_KAFKA_UTILS.CLOSE_PRODUCER('TestProducer');
else
raise_application_error(-20000, OP_RESULT);
end if;
end;
else
raise_application_error(-20000, OP_RESULT);
end if;
end;
/
III. A2_KAFKA_UTILS functions
III.1 CREATE_PRODUCER
Creates Kafka producer
Parameters
PRODUCER_NAME - name of producer
PRODUCER_PROPS - array of tuples with Kafka producer properties
Returns
'SUCCESS' - for successful operation or errorstack in case of error
III.2 CLOSE_PRODUCER
Closes Kafka producer
Parameters
PRODUCER_NAME - name of producer
Returns
'SUCCESS' - for successful operation or errorstack in case of error
III.3 SEND_TEXT_MESSAGE
Sends text message to Kafka broker
Parameters
PRODUCER_NAME - name of producer
TOPIC - Kafka broker topic
MSG_KEY - Message key
MSG_VALUE - Message value
Returns
'SUCCESS' - for successful operation or errorstack in case of error
III.4 SEND_JSON_MESSAGE
JSONifies array of tiples and send this string to Kafka broker
Parameters
PRODUCER_NAME - name of producer
TOPIC - Kafka broker topic
MSG_KEY - Message key
ARRAY_OF_VALUES - Message value
Returns
'SUCCESS' - for successful operation or errorstack in case of error
III.5 ARRAY_2_JSON
Converts array of tuples (A2_ARRAY_OF_TUPLES) to JSON formatted string.
Parameters
ARRAY_OF_VALUES - in, A2_ARRAY_OF_TUPLES
Returns
JSON formatted string presentation of given array of tuples