Wednesday, January 9, 2019

Oracle to Kafka made easy.


Sometimes we need to expose data from Oracle database to external systems/services. 
In our experience, it ends ups with kind of interim tables or even interim databases or REST service solutions, the downsides of such approaches are well known.

These days one of the best solution to communicate services is message broker, namely Apache Kafka. 

This time we would like to send data from Oracle database to Kafka, after some googling we come up with – GoldenGate, WebLogic, bunch of connectors and etc… Oh my… all of them proprietary, overcomplicated and heavy, but we just need to capture changes in one table via database trigger, then send them to Kafka.

But it is not (yet?) implemented in native pl/sql, creating REST service on the other hand requires additional “moving parts” and potentially slow.

As we know, communicate to Kafka from Java code is dead simple, so we decided to create our API in Java then load Java code into Oracle stored procedure.

Below you can find the full description of the solution along with source code.
As result we got an a fully functional pl/sql API allowing us easily send data from Oracle database to Apache Kafka without using heavy and complex proprietary software, easier and potentially way faster comparable to custom REST service.

In this article we will show how to publish messages from Oracle Database 12+ to Apache Kafka brokers.
Oracle Database to Apache Kafka integration can be achieved using many methods. In this article we explain how to do this using internal Oracle Aurora JVM. Target audience for this type of integration are sites with active usage of PL/SQL.

I. Installation

I.1 Oracle Database preparation

If you still using Oracle Database 12.1.x you need to configure it to use JDK7. To do that:
perl $ORACLE_HOME/javavm/install/update_javavm_binaries.pl 7
cd $ORACLE_HOME/rdbms/lib
make -f ins_rdbms.mk ioracle
After that for existing instance you need to perform JDK upgrade using $ORACLE_HOME/javavm/install/update_javavm_db.sql script:
$ORACLE_HOME/perl/bin/perl -I $ORACLE_HOME/rdbms/admin $ORACLE_HOME/rdbms/admin/catcon.pl -b jvm_component_upgrade.log $ORACLE_HOME/javavm/install/update_javavm_db.sql
or create new instance using DBCA utility or set of scripts if you are old school Oracle DBA.

You can check version of Oracle Aurora JVM using
select dbms_java.get_jdk_version from dual;

No additional steps are required for Oracle Database 12.2+ including 18c.

I.2 Schema owner setup

Additional Java security permissions required to run Kafka producer code in Oracle Database. We use name APPS (Oracle Applications/Oracle E-Business Suite from 1998):
create user apps identified by apps
default tablespace users
temporary tablespace temp
quota unlimited on users;

grant CONNECT,RESOURCE,JAVAUSERPRIV to APPS;
exec dbms_java.grant_permission( 'APPS', 'SYS:oracle.aurora.security.JServerPermission', 'Verifier', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:java.lang.RuntimePermission', 'getClassLoader', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanTrustPermission', 'register', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanServerPermission', 'createMBeanServer', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*', 'registerMBean');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*', 'unregisterMBean');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.utils.AppInfoParser$AppInfo#-*', 'registerMBean');

I.3 Loading of Kafka classes

We use Apache Kafka distribution as source of required libraries.

I.3.A Oracle Database 12.1

We recommend this version for Oracle Database 12.1.x, untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/lz4-java-1.4.1.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/log4j-1.2.17.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-api-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-log4j12-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/kafka-clients-1.1.1.jar');

I.3.B Oracle Database 12.2+
(at moment of writing - latest stable Kafka version).
untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/lz4-java-1.5.0.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/log4j-1.2.17.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-api-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-log4j12-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-core-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-annotations-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-databind-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/kafka-clients-2.1.0.jar');

For any Oracle Database version check for invalid objects:
select count(*) from user_objects where status<>'VALID';
If they exist recompile it using:
exec utl_recomp.recomp_serial;
or
exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);

I.4 Kafka wrapper installation

Start SQL*Plus and run as schema owner created on Step I.2 downloaded scripts in following order:
@KafkaUtils.sql
@KafkaUtilsJava.pkg
@KafkaUtils.pkg

This will create object type A2_TUPLE which simulates key-value pair, table type A2_ARRAY_OF_TUPLES - array of A2_TUPLE, Java source named eu.solutions.a2.aurora.kafka.KafkaUtils  and PL/SQL package A2_KAFKA_UTILS.
Do not forget to check for invalid objects:
select count(*) from user_objects where status<>'VALID';
If they exist recompile using:
exec utl_recomp.recomp_serial;
or
exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);

II. Usage example

Typical usage is in creation of producer with call to A2_KAFKA_UTILS.CREATE_PRODUCER then sending messages with A2_KAFKA_UTILS.SEND_JSON_MESSAGE or A2_KAFKA_UTILS.SEND_STRING_MESSAGE and then closing producer with A2_KAFKA_UTILS.CLOSE_PRODUCER.

declare
OP_RESULT varchar2(2000);
KAFKA_PROPS A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();
MSG_2_SEND A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();
begin
-- Set mandatory Kafka producer properties
KAFKA_PROPS.extend(4);
KAFKA_PROPS(1) := A2_TUPLE('bootstrap.servers', 'kafka1.a2-solutions.eu:9092');
KAFKA_PROPS(2) := A2_TUPLE('client.id', 'KafkaOracleAuroraProducer');
KAFKA_PROPS(3) := A2_TUPLE('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
KAFKA_PROPS(4) := A2_TUPLE('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
-- Create producer with name TestProducer
OP_RESULT := A2_KAFKA_UTILS.CREATE_PRODUCER('TestProducer', KAFKA_PROPS);
if ('SUCCESS' = OP_RESULT) then
begin
MSG_2_SEND.extend(3);
MSG_2_SEND(1) := A2_TUPLE('INVOICE_ID', '13474');
MSG_2_SEND(2) := A2_TUPLE('INVOICE_NUM', 'AP-2019-13474');
MSG_2_SEND(3) := A2_TUPLE('INVOICE_AMOUNT', '2000');
OP_RESULT := A2_KAFKA_UTILS.SEND_JSON_MESSAGE('TestProducer','TestTopic','Test-Message-Key',MSG_2_SEND);
if ('SUCCESS' = OP_RESULT) then
OP_RESULT := A2_KAFKA_UTILS.CLOSE_PRODUCER('TestProducer');
else
raise_application_error(-20000, OP_RESULT);
end if;
end;
else
raise_application_error(-20000, OP_RESULT);
end if;

end;
/

III. A2_KAFKA_UTILS functions

III.1 CREATE_PRODUCER

Creates Kafka producer

Parameters

PRODUCER_NAME - name of producer
PRODUCER_PROPS - array of tuples with Kafka producer properties

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.2 CLOSE_PRODUCER

Closes Kafka producer

Parameters

PRODUCER_NAME - name of producer

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.3 SEND_TEXT_MESSAGE

Sends text message to Kafka broker

Parameters

PRODUCER_NAME - name of producer
TOPIC - Kafka broker topic
MSG_KEY - Message key
MSG_VALUE - Message value

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.4 SEND_JSON_MESSAGE

JSONifies array of tiples and send this string to Kafka broker

Parameters

PRODUCER_NAME - name of producer
TOPIC - Kafka broker topic
MSG_KEY - Message key
ARRAY_OF_VALUES - Message value

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.5 ARRAY_2_JSON

Converts array of tuples (A2_ARRAY_OF_TUPLES) to JSON formatted string.

Parameters

ARRAY_OF_VALUES - in, A2_ARRAY_OF_TUPLES

Returns

JSON formatted string presentation of given array of tuples



Tuesday, January 8, 2019

Oracle SYS.AUD$ performance issues, Apache Kafka come to rescue!


OK, we’ll continue about Oracle Database and Apache Kafka integration.
In which We describe very straight usage of Oracle Database as Apache Kafka producer.
Here we discuss how Apache Kafka can help in improving some pains while running
Oracle database.
Just to google for SYS.AUD$:
4470000 results and most of them are related to
592000 results about performance issues with SYS.AUD$!
Apache Kafka come to rescue!
Starting from Oracle Database  11g default options are:
audit_trail = db
audit_sys_operations = true
This produce lot of records in SYS.AUD$ table, fortunately starting from Oracle Database 12c this table
is in SYSAUX table space, Unified Auditing is introduced too. But this still generate contention in the same
database instance! We can disable audit_trail, but what to do if we must audit some actions or data
changes according to industry (PCI DSS for example) or government standards?
Solution is in setting audit destination to file system, watch this file system for changes and then transfer
audit files to Apache Kafka broker. This will consume CPU on database server only for file system
watching and transferring files to Kafka.
Source code and installation instructions are available at https://github.com/averemee-si/oraaud-kafka.
Using Kafka Streams or Kafka Consumers you can perform any analysis of audit information without
“eating” CPU and disk I/O of datab