Wednesday, January 9, 2019

Oracle to Kafka made easy.


Sometimes we need to expose data from Oracle database to external systems/services. 
In our experience, it ends ups with kind of interim tables or even interim databases or REST service solutions, the downsides of such approaches are well known.

These days one of the best solution to communicate services is message broker, namely Apache Kafka. 

This time we would like to send data from Oracle database to Kafka, after some googling we come up with – GoldenGate, WebLogic, bunch of connectors and etc… Oh my… all of them proprietary, overcomplicated and heavy, but we just need to capture changes in one table via database trigger, then send them to Kafka.

But it is not (yet?) implemented in native pl/sql, creating REST service on the other hand requires additional “moving parts” and potentially slow.

As we know, communicate to Kafka from Java code is dead simple, so we decided to create our API in Java then load Java code into Oracle stored procedure.

Below you can find the full description of the solution along with source code.
As result we got an a fully functional pl/sql API allowing us easily send data from Oracle database to Apache Kafka without using heavy and complex proprietary software, easier and potentially way faster comparable to custom REST service.

In this article we will show how to publish messages from Oracle Database 12+ to Apache Kafka brokers.
Oracle Database to Apache Kafka integration can be achieved using many methods. In this article we explain how to do this using internal Oracle Aurora JVM. Target audience for this type of integration are sites with active usage of PL/SQL.

I. Installation

I.1 Oracle Database preparation

If you still using Oracle Database 12.1.x you need to configure it to use JDK7. To do that:
perl $ORACLE_HOME/javavm/install/update_javavm_binaries.pl 7
cd $ORACLE_HOME/rdbms/lib
make -f ins_rdbms.mk ioracle
After that for existing instance you need to perform JDK upgrade using $ORACLE_HOME/javavm/install/update_javavm_db.sql script:
$ORACLE_HOME/perl/bin/perl -I $ORACLE_HOME/rdbms/admin $ORACLE_HOME/rdbms/admin/catcon.pl -b jvm_component_upgrade.log $ORACLE_HOME/javavm/install/update_javavm_db.sql
or create new instance using DBCA utility or set of scripts if you are old school Oracle DBA.

You can check version of Oracle Aurora JVM using
select dbms_java.get_jdk_version from dual;

No additional steps are required for Oracle Database 12.2+ including 18c.

I.2 Schema owner setup

Additional Java security permissions required to run Kafka producer code in Oracle Database. We use name APPS (Oracle Applications/Oracle E-Business Suite from 1998):
create user apps identified by apps
default tablespace users
temporary tablespace temp
quota unlimited on users;

grant CONNECT,RESOURCE,JAVAUSERPRIV to APPS;
exec dbms_java.grant_permission( 'APPS', 'SYS:oracle.aurora.security.JServerPermission', 'Verifier', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:java.lang.RuntimePermission', 'getClassLoader', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanTrustPermission', 'register', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanServerPermission', 'createMBeanServer', '');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*', 'registerMBean');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.metrics.JmxReporter$KafkaMbean#-*', 'unregisterMBean');
exec dbms_java.grant_permission( 'APPS', 'SYS:javax.management.MBeanPermission', 'org.apache.kafka.common.utils.AppInfoParser$AppInfo#-*', 'registerMBean');

I.3 Loading of Kafka classes

We use Apache Kafka distribution as source of required libraries.

I.3.A Oracle Database 12.1

We recommend this version for Oracle Database 12.1.x, untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/lz4-java-1.4.1.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/log4j-1.2.17.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-api-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/slf4j-log4j12-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.11-1.1.1/libs/kafka-clients-1.1.1.jar');

I.3.B Oracle Database 12.2+
(at moment of writing - latest stable Kafka version).
untar downloaded archive and then issue in SQL*Plus as schema owner created on Step I.2
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/lz4-java-1.5.0.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/log4j-1.2.17.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-api-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/slf4j-log4j12-1.7.25.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-core-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-annotations-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/jackson-databind-2.9.7.jar');
exec dbms_java.loadjava('-r -v -f -noverify /oracle/product/kafka_2.12-2.1.0/libs/kafka-clients-2.1.0.jar');

For any Oracle Database version check for invalid objects:
select count(*) from user_objects where status<>'VALID';
If they exist recompile it using:
exec utl_recomp.recomp_serial;
or
exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);

I.4 Kafka wrapper installation

Start SQL*Plus and run as schema owner created on Step I.2 downloaded scripts in following order:
@KafkaUtils.sql
@KafkaUtilsJava.pkg
@KafkaUtils.pkg

This will create object type A2_TUPLE which simulates key-value pair, table type A2_ARRAY_OF_TUPLES - array of A2_TUPLE, Java source named eu.solutions.a2.aurora.kafka.KafkaUtils  and PL/SQL package A2_KAFKA_UTILS.
Do not forget to check for invalid objects:
select count(*) from user_objects where status<>'VALID';
If they exist recompile using:
exec utl_recomp.recomp_serial;
or
exec utl_recomp.recomp_parallel(<NUMBER_OF_THREADS>);

II. Usage example

Typical usage is in creation of producer with call to A2_KAFKA_UTILS.CREATE_PRODUCER then sending messages with A2_KAFKA_UTILS.SEND_JSON_MESSAGE or A2_KAFKA_UTILS.SEND_STRING_MESSAGE and then closing producer with A2_KAFKA_UTILS.CLOSE_PRODUCER.

declare
OP_RESULT varchar2(2000);
KAFKA_PROPS A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();
MSG_2_SEND A2_ARRAY_OF_TUPLES := A2_ARRAY_OF_TUPLES();
begin
-- Set mandatory Kafka producer properties
KAFKA_PROPS.extend(4);
KAFKA_PROPS(1) := A2_TUPLE('bootstrap.servers', 'kafka1.a2-solutions.eu:9092');
KAFKA_PROPS(2) := A2_TUPLE('client.id', 'KafkaOracleAuroraProducer');
KAFKA_PROPS(3) := A2_TUPLE('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
KAFKA_PROPS(4) := A2_TUPLE('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
-- Create producer with name TestProducer
OP_RESULT := A2_KAFKA_UTILS.CREATE_PRODUCER('TestProducer', KAFKA_PROPS);
if ('SUCCESS' = OP_RESULT) then
begin
MSG_2_SEND.extend(3);
MSG_2_SEND(1) := A2_TUPLE('INVOICE_ID', '13474');
MSG_2_SEND(2) := A2_TUPLE('INVOICE_NUM', 'AP-2019-13474');
MSG_2_SEND(3) := A2_TUPLE('INVOICE_AMOUNT', '2000');
OP_RESULT := A2_KAFKA_UTILS.SEND_JSON_MESSAGE('TestProducer','TestTopic','Test-Message-Key',MSG_2_SEND);
if ('SUCCESS' = OP_RESULT) then
OP_RESULT := A2_KAFKA_UTILS.CLOSE_PRODUCER('TestProducer');
else
raise_application_error(-20000, OP_RESULT);
end if;
end;
else
raise_application_error(-20000, OP_RESULT);
end if;

end;
/

III. A2_KAFKA_UTILS functions

III.1 CREATE_PRODUCER

Creates Kafka producer

Parameters

PRODUCER_NAME - name of producer
PRODUCER_PROPS - array of tuples with Kafka producer properties

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.2 CLOSE_PRODUCER

Closes Kafka producer

Parameters

PRODUCER_NAME - name of producer

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.3 SEND_TEXT_MESSAGE

Sends text message to Kafka broker

Parameters

PRODUCER_NAME - name of producer
TOPIC - Kafka broker topic
MSG_KEY - Message key
MSG_VALUE - Message value

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.4 SEND_JSON_MESSAGE

JSONifies array of tiples and send this string to Kafka broker

Parameters

PRODUCER_NAME - name of producer
TOPIC - Kafka broker topic
MSG_KEY - Message key
ARRAY_OF_VALUES - Message value

Returns

'SUCCESS' - for successful operation or errorstack in case of error

III.5 ARRAY_2_JSON

Converts array of tuples (A2_ARRAY_OF_TUPLES) to JSON formatted string.

Parameters

ARRAY_OF_VALUES - in, A2_ARRAY_OF_TUPLES

Returns

JSON formatted string presentation of given array of tuples



17 comments:

  1. Very interesting post which helped us a great deal. We are now trying to connect to a Kafka server using SASL_PLAINTEXT or SASL_SSL as the security protocol from an Oracle database. Several additional java grants were needed to create the KafkaProducer correctly. But when then trying to send a message it returns the error:
    java.security.AccessControlException: the Permission ("javax.security.auth.PrivateCredentialPermission" "java.lang.String" "read") has not been granted to ProtectionDomain ...

    We tried to give several permissions with dbms_java.grant_permission, but none seem to work. Do you have any idea how to handle this?

    ReplyDelete
    Replies
    1. Had the same error. The grants did not fixed it!

      I tried:
      exec dbms_java.grant_permission( 'MYUSER', 'SYS:javax.security.auth.PrivateCredentialPermission', 'java.lang.String', 'read' );

      exec dbms_java.grant_permission( 'MYUSER', 'javax.security.auth.PrivateCredentialPermission', '*', '' );


      Only thing that helped was to grant all persmissions:

      exec dbms_java.grant_permission( 'MYUSER', 'SYS:java.security.AllPermission', '', '');

      Delete

    2. Seems that PrivateCredentialPermission has special syntax. This way it worked for me (must be executed as sysdba):

      exec dbms_java.grant_permission( 'MYUSER', 'javax.security.auth.PrivateCredentialPermission', 'java.lang.String * "*"', 'read');

      And "AllPermission" is not necessary.

      Delete
  2. Please provide code sending the message to and full error log.

    ReplyDelete
    Replies
    1. Hi,

      The following permissions were added:
      exec dbms_java.grant_permission( user, 'SYS:java.security.SecurityPermission', 'putProviderProperty.Simple SASL/PLAIN Server Provider', '');
      exec dbms_java.grant_permission( user, 'SYS:java.security.SecurityPermission', 'insertProvider', '');
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.AuthPermission', 'modifyPublicCredentials', '' );
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.AuthPermission', 'modifyPrivateCredentials', '' );
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.AuthPermission', 'doAs', '' );
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.AuthPermission', 'getSubject', '' );
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.AuthPermission', 'createLoginContext.KafkaClient', '' );

      Then this works (returns SUCCESS):
      select A2_KAFKA_UTILS.CREATE_PRODUCER( PRODUCER_NAME => 'ItsMe'
      , PRODUCER_PROPS => A2_ARRAY_OF_TUPLES( A2_TUPLE('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer')
      , A2_TUPLE('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer')
      , A2_TUPLE('bootstrap.servers', 'confluent-kafka1.nix.infrabel.be:39092,confluent-kafka2.nix.infrabel.be:39092,confluent-kafka3.nix.infrabel.be:39092')
      , A2_TUPLE('security.protocol', 'SASL_PLAINTEXT')
      , A2_TUPLE('client.id', 'KafkaOracleAuroraProducer')
      , A2_TUPLE('ssl.truststore.location', '/tmp/kafka-broker.truststore.jks')
      , A2_TUPLE('ssl.truststore.password', 'mypwd')
      , A2_TUPLE('sasl.mechanism', 'PLAIN')
      , A2_TUPLE('sasl.jaas.config', 'org.apache.kafka.common.security.plain.PlainLoginModule required username="demo" password="demo";')
      )
      ) rslt from dual;

      Delete
    2. But then this:
      select A2_KAFKA_UTILS.SEND_STRING_MESSAGE( PRODUCER_NAME => 'ItsMe'
      , TOPIC => 'demo'
      , MSG_KEY => 'MyKey'
      , MSG_VALUE => 'A string'
      ) rslt from dual;

      returns an error:
      org.apache.kafka.common.errors.TimeoutException: Topic demo not present in metadata after 10000 ms.
      org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1255)
      org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:916)
      org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:839)
      org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:726)
      eu.solutions.a2.aurora.kafka.KafkaUtils.sendStringMessage(KafkaUtils:99)

      In the log it says:

      java.security.AccessControlException: the Permission ("javax.security.auth.PrivateCredentialPermission" "java.lang.String" "read") has not been granted to ProtectionDomain ...

      Added extra permissions like:
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.PrivateCredentialPermission', 'java.lang.String', 'read' );
      exec dbms_java.grant_permission( user, 'SYS:javax.security.auth.PrivateCredentialPermission', '*', 'read' );

      does not change anything.

      Delete
    3. This comment has been removed by the author.

      Delete
    4. Hello

      This error generates Oracle RDBMS trace file too. Could you please post content of this file here or send it directly to me (Ales.V@a2-solutions.eu)

      Delete
  3. Hello,
    Cold you please provide Kafka auth properties and full error stack
    BR,
    AV

    ReplyDelete
  4. Hi Sergey,

    This is very good article. Thanks for sharing such great work.

    I'm trying to configure Kafka 2.4.0 with Oracle database 19c. It raises the following exception when trying to load the JAR files:
    SQL> EXEC dbms_java.loadjava('-jarsasdbobjects -v -f -noverify /home/oracle/Downloads/log4j-1.2.17.jar');

    Error starting at line : 30 in command -
    BEGIN dbms_java.loadjava('-jarsasdbobjects -v -f -noverify /home/oracle/Downloads/log4j-1.2.17.jar'); END;
    Error report -
    ORA-29532: Java call terminated by uncaught Java exception: oracle.aurora.server.tools.loadjava.ToolsError:
    Error during loadjava: Failures occurred during processing.
    Check trace file for details
    ORA-06512: at "SYS.DBMS_JAVA", line 587
    ORA-06512: at line 1
    29532. 00000 - "Java call terminated by uncaught Java exception: %s"
    *Cause: A Java exception or error was signaled and could not be
    resolved by the Java code.
    *Action: Modify Java code, if this behavior is not intended.

    Do you know how can I resolve it?

    Many Thanks and

    Kind Regards,
    Bilal

    ReplyDelete
    Replies
    1. Hi Bill,

      Usually ORA-29532 generates trc file under diagnostic_dest.
      Could you please send it to oracle@a2-solutions.eu

      Also please look at https://github.com/averemee-si/oracdc - our another solution for integrating Oracle RDBMS and Kafka.

      Feel free to contact us.

      Regards,
      Aleksej


      Delete
  5. Hey Sergey:

    I was trying to compile kafka_2.13-2.5.0, and I ran into a a problem or two.

    Specifically, loadjava of slf4j-api-1.7.30.jar fails.

    The tracefile says that there are objects (one than 1) that can't be resolved. For example, ORA-29534: referenced object APPS.org/slf4j/helpers/NamedLoggerBase could not be resolved.

    I would very much like to work with to see if we can get kafka_2.13-2.5.0 compiled into Oracle.

    Please let me know,

    Thanks, Ed Sileo (edward.sileo@gmail.com)

    ReplyDelete
    Replies
    1. I had same error with "slf4j-api-1.7.25.jar". The -genmissing additional flag helped:

      begin
      dbms_java.loadjava('-r -v -f -noverify -genmissing /home/oracle/kafka_2.12-2.1.0/libs/slf4j-api-1.7.25.jar');
      end;
      /

      Delete
    2. Tere Tõnu!
      Could you please send me your errorstack (oracle@a2-solutions.eu) - looks like you trying to load some classes compiled with Java 11 to Oracle DD

      Delete
  6. Hello Sergey,

    I am trying to implement similar approach - send message to Kafka server from Oracle 19c database.
    Kafka instance is running on same server where Oracle is running.
    Unfortunately it does not work.

    It works if I send message from kafka-console-producer or from simplest java command line application, but with same producer's properties it does not work from PL/SQL calling java function

    It fails after 60 seconds unabling to retrieve metadata for the specific topic. I tried to create kafka consumer and just retrieve list of topics, it fails with
    "Timeout expired while fetching topic metadata
    org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata (Fetcher.java)
    ....
    "
    Looks like it can't "access" port 9092, even though simple call UTL_TCP.Open_Connection with server IP and port 9092 works

    Any idea what could cause the problem?

    Заранее спасибо за любую помощь в этом вопросе :)

    Best regards

    Artem

    ReplyDelete
  7. Hello,
    Have you make thru the problem with "the Permission ("javax.security.auth.PrivateCredentialPermission" "java.lang.String" "read") has not been granted to ProtectionDomain"?

    I have try some examples from previous comments like "exec dbms_java.grant_permission( 'MY_USER', 'javax.security.auth.PrivateCredentialPermission', 'java.lang.String * "*"', 'read' );" but i always get error:
    Exception in thread "Root Thread" java.lang.SecurityException: policy table update javax.security.auth.PrivateCredentialPermission, java.lang.String * "*"
    at oracle.aurora.rdbms.security.PolicyTableManager.checkPermission(PolicyTableManager.java:131)
    at oracle.aurora.rdbms.security.PolicyTableManager.activate(PolicyTableManager.java:546)
    at oracle.aurora.rdbms.security.PolicyTableManager.grant(PolicyTableManager.java:597)

    ReplyDelete
  8. Hello Dawid, did you resolve the issue? We are also seeing the similar error when use the below command..

    exec dbms_java.grant_permission( 'APPS', 'SYS:javax.security.auth.PrivateCredentialPermission', 'java.lang.String * "*"', 'read');

    ERROR at line 1:
    ORA-29532: Java call terminated by uncaught Java exception:
    java.lang.SecurityException: policy table update
    SYS:javax.security.auth.PrivateCredentialPermission, java.lang.String * "*"
    ORA-06512: at "SYS.DBMS_JAVA", line 705
    ORA-06512: at line 1


    We tried with out SYS, but no use.

    ReplyDelete