Subtitle: Continuous Integration of Oracle Database to other heterogeneous systems, e.g. Big Data

Hi there,

Basically, this blog post is about Oracle database but that time inspired by the fact that there is more than just Oracle.

In a discussion with colleagues we broke our heads of how to connect Oracle databases to big data platforms like Kafka or NoSQL databases like MongoDB or Cassandra in a way where data changes are sent continuously. Well you could use GoldenGate for Big Data or dbvisit replicate here. But these are licensable tools and GoldenGate is even quite expensive. So, we thought about using LogMiner directly, as Integrated Extract of GoldenGate does nothing else internally. Well it would work. But the more I thought about that, the more I came to the clue that it would be wiser to use Streams here, as this would have the advantage that the API is much better prepared for all the needed tasks about managing the logminer sessions and grabbing all the needed data. Streams is deprecated but works well without any extra license, except that you need to have Enterprise Edition for the asynchronous capture which uses LogMiner. Please do not confuse the pure Streams with Change Data Capture Feature within Oracle database which is desupported as of 12c: see https://docs.oracle.com/database/121/UPGRD/deprecated.htm#UPGRD60083.

Well, there is one more constraint and you might find it severe. You cannot use it in a container database environment. This is simply not supported. You will get "ORA-26932: Streams configuration is not allowed in a container database." or
"ORA-26931: This procedure can only be invoked from the root container." if you try to create a streams process within a PDB.

The following shows a setup which prepares a table with the captured change data, in my case I called it activity_logger. I decided to include following fields in a plain Oracle table:

  log_id                 - surrogate key, filled by a sequence using the 12c feature "DEFAULT <sequence>.nextval"
  log_date             - date when the row was logged
  source_scn        - scn of the LCR (logical change record)
  transaction_id    - transaction identifier, used to find multiple operations within single transactions
  operation           - type of change, DDL, or DML (INSERT, UPDATE, DELETE) or LOB Operation
  schema_name   - schema
  table_name        - table name
  rec_key              - primary key of the record, I use PL/SQL function extractPKFromLCR to derive it from an LCR
  lcr_xml                - xml representation of LCR created by dbms_streams.convert_lcr_to_xml
  lcr_sql CLOB      - SQL statement that represents the change
  lcr_json CLOB    - json of the change, for the json format I was inspired by the one used within
                               GoldenGate for BigData and the Kafka Connect Handler handler.
                               I use PL/SQL function lcr2json to derive it from an LCR
  lcr sys.anydata   - the pure LCR as AnyData Object, just for the case I need it for any further debugging

Well you could use this table also for auditing purposes, but the main idea - at least for this blog post - is to read it with one of the connectors for MongoDB, Cassandra, Kafka, etc. and send it to those systems in suitable way. And you would later on omit the population of all the columns you do not need, just to save performance overhead and storage. For the certain cases it might be better to have even other structures for this change data, but as this is custom code anyway, there are no major limitations for the implementation. And you need to design a housekeeping strategy for this data to purge all data which is already consumed or analyzed or whatever. For that it might be smart to partition the table.

The source database in my example is Oracle 12.2. Source schema is HR. The activity_logger table is written to the strmadmin schema within that database.
The setup uses one capture and one apply in a single database. In production environments you might want to offload both processes to another server and database, this is possible and called downstream capture configuration, which is a well known concept in streams and documented in https://docs.oracle.com/en/database/oracle/oracle-database/12.2/strms/introduction-to-oracle-streams.html#GUID-1DB18367-F886-45E1-B3ED-27EBAABE3EAA. Of course you could include propagations to send the LCRs via database link. But for the simplest cases this would not be needed.

So here are the steps:

1. basic stuff

The database needs to be in archivelog mode, the setting of force_logging is recommended
ALTER DATABASE FORCE LOGGING;
database needs to have minimal supplemental logging enabled:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
Check it with
SELECT force_logging, supplemental_log_data_min FROM v$database;
Parameter global_names should be set to true, but as we do not use database links this does not really matter, of course you should have a streams pool set up but if you use automatic memory management or automatic shared memory management and allow it to grow automatically everything is fine with that.

2. the strmadmin user

CREATE USER strmadmin IDENTIFIED BY ***;
REM grant only restrictive privileges on strmadmin schema to avoid "accidents" where apply manipulates data in source schemas!!
GRANT CREATE SESSION, CREATE SEQUENCE, CREATE TABLE, CREATE PROCEDURE TO strmadmin;
ALTER USER strmadmin QUOTA UNLIMITED ON users;
REM this grants execute rights on DBMS streams packages and access to streams data dictionary
exec dbms_streams_auth.grant_admin_privilege( grantee => 'STRMADMIN' );
REM for the extractPKFromLCR function
GRANT SELECT ON dba_cons_columns TO strmadmin;
GRANT SELECT ON dba_constraints  TO strmadmin;
note that the strmadmin user has no grants to select or change data from any HR table.

3. the strmadmin tables

CONNECT strmadmin/***
CREATE OR REPLACE FUNCTION extractPKFromLCR ( lcr IN sys.anydata ) RETURN VARCHAR2 IS ...
This function retrieves the primary key columns from the LCR as a string. In case of multiple columns are used, they are concatenated delimited by underscore character. Here again I was inspired by the way GoldenGate for Big Data is typically doing it, when ${primaryKeys} template keyword is used for the gg.handler.name.keyMappingTemplate property. See https://docs.oracle.com/goldengate/bd123110/gg-bd/GADBD/using-kafka-connect-handler.htm#GADBD-GUID-A87CAFFA-DACF-43A0-8C6C-5C64B578D606
CREATE OR REPLACE FUNCTION lcr2json ( lcr IN sys.anydata ) RETURN CLOB IS ..
This function creates a json from the LCR.
Both functions are little longer at thus omitted here for readability, you can download them from the sources which are attached to this blog entry. See attachment streams2Kafka.zip at the end.
CREATE SEQUENCE log_seq;
CREATE TABLE activity_logger
( log_id NUMBER DEFAULT log_seq.nextval,
  log_date DATE DEFAULT sysdate,
  source_scn NUMBER,
  transaction_id VARCHAR2(30),
  operation VARCHAR2(15),
  schema_name VARCHAR2(128),
  table_name VARCHAR2(128),
  rec_key VARCHAR2(4000),
  lcr_xml CLOB,
  lcr_sql CLOB,
  lcr_json CLOB,
  lcr sys.anydata
)
;
ALTER TABLE activity_logger ADD CONSTRAINT pk_activity_logger PRIMARY KEY (log_id);
CREATE TABLE heartbeat
 ( site varchar2(4000) CONSTRAINT heartbeat_pk PRIMARY KEY,
   stamp DATE)
;
The activity_logger is the table which I described before. The heartbeat table is my default table which I use in all my Streams and GoldenGate setups to monitor the health state of the replication itself. A simple look into the table shows me whether replication is running fine or maybe broken.

4. the heartbeat table also in the source schema and its job

CONNECT hr/***
CREATE TABLE heartbeat
 ( site varchar2(4000) CONSTRAINT heartbeat_pk PRIMARY KEY,
   stamp DATE)
;
INSERT INTO heartbeat (site,stamp)
SELECT global_name, sysdate FROM global_name;
COMMIT;
BEGIN
  dbms_scheduler.create_job(
    job_name   => 'heartbeat_update_job',
    job_type   => 'plsql_block',
    job_action => 'update heartbeat set stamp = sysdate where site = (select global_name from global_name);',
    repeat_interval => 'freq=minutely;bysecond=0,15,30,45',
    comments   => 'heartbeat for streams activity logger',
    enabled    => FALSE
  );
END;
/
I created the heartbeat update job, but let it disabled so far, it will be enabled as soon as streams instantiation is finished.


5. streams queue

CONNECT strmadmin/***
BEGIN
  dbms_streams_adm.set_up_queue( queue_table => 'STREAMS_QUEUE_TABLE', queue_name => 'STREAMS_QUEUE');
END;
/

6. streams apply and capture processes

CONNECT strmadmin/***
BEGIN
  dbms_apply_adm.create_apply ( queue_name => 'STREAMS_QUEUE', apply_name => 'APP', apply_captured => true);
END;
/
BEGIN
  dbms_capture_adm.create_capture ( queue_name => 'STREAMS_QUEUE', capture_name => 'CAP', checkpoint_retention_time => 1);
END;
/

7. Capture rules

In my case I decided to capture all tables in HR schema and to create individual rules for every table. The only table I really want to apply in a regular streams way is heartbeat. Hence I change the schema for it.
CONNECT strmadmin/***
DECLARE
  v_dml_rule VARCHAR2(200);
  v_ddl_rule VARCHAR2(200);
BEGIN
  FOR i IN (SELECT table_name FROM dba_tables WHERE owner='HR') LOOP
    dbms_streams_adm.add_table_rules(
      table_name         => 'HR.' || i.table_name,
      streams_type       => 'CAPTURE',
      streams_name       => 'CAP',
      queue_name         => 'STREAMS_QUEUE',
      include_dml        => true,
      include_ddl        => true,
      include_tagged_lcr => false,
      source_database    => null,
      dml_rule_name      => v_dml_rule,
      ddl_rule_name      => v_ddl_rule,
      inclusion_rule     => true
    );
    IF i.table_name = 'HEARTBEAT' THEN
      dbms_streams_adm.rename_schema(
        rule_name => v_dml_rule,
        from_schema_name => 'HR',
        to_schema_name => 'STRMADMIN'
      );
    END IF;
  END LOOP;
END;
/

You can query dba_streams_rules and dba_streams_transformations after this block to verify that everything was setup correctly.

8. prepare tables for capture

Normally the tables for which capture rules are set up should be prepared correctly for capture, that means, that supplemental log groups for PK, UK and FK should be created (like you do with "add trandata" in GoldenGate). Either you check that in dba_capture_prepared_tables and dba_log_groups or you simply run the following block to be sure, that this is done accurately.
CONNECT strmadmin/***
BEGIN
  FOR i IN (SELECT table_name FROM dba_tables
            WHERE owner='HR'
            AND temporary='N'
            AND (iot_type != 'IOT_OVERFLOW' OR iot_type IS NULL)
           ) LOOP
    dbms_capture_adm.prepare_table_instantiation('HR.' || i.table_name, supplemental_logging => 'KEYS');
  END LOOP;
END;
/
if you want supplemental log groups for all columns here, you would set the parameter supplemental_logging to 'ALL' you can check it in dba_capture_prepared_tables

9. DML and DDL handler

Now comes the little tricky part. we setup handlers which protocol the changes to our activity_logger table rather than actually applying it to real tables. The exception is the heartbeat table.
CONNECT strmadmin/***
REM the ddl handler
CREATE OR REPLACE PROCEDURE ddl_handler (lcr_anydata IN SYS.AnyData)
AS
  lcr      SYS.LCR$_DDL_RECORD;
  t        PLS_INTEGER;
  v_xml    CLOB;
  v_sql    CLOB;
BEGIN
  v_xml := dbms_streams.convert_lcr_to_xml(lcr_anydata).getclobval;
  dbms_lob.createtemporary(v_sql, cache => TRUE);
  t := lcr_anydata.getObject(lcr);
  lcr.get_ddl_text(v_sql);
  INSERT INTO activity_logger
    (source_scn,
     transaction_id,
     operation,
     schema_name,
     table_name,
     rec_key,
     lcr_xml,
     lcr_sql,
     lcr_json,
     lcr)
  VALUES (
     lcr.get_scn,
     lcr.get_transaction_id,
     'DDL',
     lcr.get_object_owner,
     lcr.get_object_name,
     'DDL_' || trim(to_char(lcr.get_scn,rpad(9,63,9))),
     v_xml,
     v_sql,
     lcr2json(lcr_anydata),
     lcr_anydata
     );
  dbms_lob.freetemporary(v_sql);
END;
/

There are lot more functions on LCR data types which could be used, see documentation https://docs.oracle.com/en/database/oracle/oracle-database/12.2/arpls/Logical-Change-Record-TYPEs.html#GUID-DF24B84A-4430-41CF-9104-20046FB18D10. The data could be iterated and processed here in a more suitable way depending on the use case. We could even store the LCRs as anydata objects only, to allow the usage of the LCR processing APIs asynchronously from the Streams apply process.

REM associate this DDL handler with the apply
BEGIN
  dbms_apply_adm.alter_apply(
    apply_name  => 'APP',
    ddl_handler => 'STRMADMIN.DDL_HANDLER'
  );
END;
/
REM similarly we create a dml handler
CREATE OR REPLACE PROCEDURE dml_handler (lcr_anydata IN SYS.AnyData)
IS
  lcr      SYS.LCR$_ROW_RECORD;
  t        PLS_INTEGER;
  v_xml    CLOB;
  v_sql    CLOB;
BEGIN
  v_xml := dbms_streams.convert_lcr_to_xml(lcr_anydata).getclobval;
  dbms_lob.createtemporary(v_sql, cache => TRUE);
  t := lcr_anydata.getObject(lcr);
  lcr.get_row_text(v_sql);
  INSERT INTO activity_logger
    (source_scn,
     transaction_id,
     operation,
     schema_name,
     table_name,
     rec_key,
     lcr_xml,
     lcr_sql,
     lcr_json,
     lcr)
  VALUES (
     lcr.get_scn,
     lcr.get_transaction_id,
     lcr.get_command_type,
     lcr.get_object_owner,
     lcr.get_object_name,
     extractPKFromLCR(lcr_anydata),
     v_xml,
     trim(v_sql),
     lcr2json(lcr_anydata),
     lcr_anydata
     );
  dbms_lob.freetemporary(v_sql);
END;
/
REM associate this DML handler with the apply for all tables in HR except heartbeat and all operations (operation_name => 'DEFAULT')
BEGIN
  FOR i IN (SELECT table_name FROM dba_tables WHERE owner='HR' AND table_name NOT IN ('HEARTBEAT')) LOOP
    dbms_apply_adm.set_dml_handler(
      object_name        => 'HR.' || i.table_name,
      object_type        => 'TABLE',
      operation_name     => 'DEFAULT',
      user_procedure     => 'STRMADMIN.DML_HANDLER',
      apply_name         => 'APP'
    );
  END LOOP;
END;
/
Note that the actual execution of the LCR is missing in the handlers, which is normally done in apply handlers. This would be accomplished with "lcr.execute()". You can check that all this was correctly setup in the views dba_apply (ddl_handler column) and dba_apply_dml_handlers.

10. Instantiation

CONNECT / AS SYSDBA
REM I connect with sys here to have all permissions on strmadmin.heartbeat and hr.heartbeat
INSERT INTO strmadmin.heartbeat SELECT * FROM hr.heartbeat;
COMMIT;
CONNECT strmadmin/***
DECLARE
  v_global_name VARCHAR2(128);
BEGIN
  SELECT global_name INTO v_global_name FROM global_name;
  dbms_apply_adm.set_schema_instantiation_scn(
    source_schema_name   => 'HR',
    source_database_name => v_global_name,
    instantiation_scn    => dbms_flashback.get_system_change_number,
    recursive            => TRUE
  );
END;
/
now we can enable the heartbeat update job
CONNECT hr/***
exec dbms_scheduler.enable('HEARTBEAT_UPDATE_JOB');
Check that stuff in dba_apply_instantiated_objects and dba_apply_instantiated_schemas.

11. Let's start it

CONNECT strmadmin/***
exec dbms_apply_adm.start_apply('APP');
exec dbms_capture_adm.start_capture('CAP');

12. Let's monitor it

have a look at the alert log. You should see something like
Streams APPLY AP01 for APP started with pid=69, OS id=38516
Streams CAPTURE CP01 for CAP started with pid=70, OS id=38520
Starting persistent Logminer Session with sid = 4 for Streams Capture CAP
...
LOGMINER: Begin mining logfile for session 4 thread 1 sequence 1363, /u01/oradata/NIREUS/redog1m1NIREUS.dbf

now it is time to monitor the heartbeat

CONNECT strmadmin/***
SELECT stamp FROM heartbeat;
If everything is fine the query should show a recent time.

13. testing some DDL and DML on HR schema.

CONNECT hr/***
I did the following here:
CREATE TABLE emp_backup AS SELECT * FROM emp;
TRUNCATE TABLE emp;
INSERT INTO emp SELECT * FROM emp_backup;
COMMIT;
UPDATE emp SET SALARY = SALARY * 1.10;
COMMIT;
DELETE FROM emp;
COMMIT;
INSERT INTO emp SELECT * FROM emp_backup;
COMMIT;

14. Let's report on this

SELECT operation, transaction_id, count(*)
FROM strmadmin.activity_logger
GROUP BY operation, transaction_id;

OPERATION       TRANSACTION_ID                   COUNT(*)
--------------- ------------------------------ ----------
DDL             1.12.1913                               1
UPDATE          1.16.1918                             107
DELETE          4.2.1920                              107
INSERT          9.33.2486                             107
INSERT          4.4.1920                              107

5 rows selected.

The creation of emp_backup table is not recorded, which is intended, as I have capture rules per table, not per schema. For each manipulated row in the DML statements I have one recorded row in my logging table.

SET LONG 100000
SET LINES 1200
COLUMN lcr_xml FORMAT A1100
COLUMN lcr_sql FORMAT A1100
SET PAGES 0
SELECT lcr_xml FROM strmadmin.activity_logger WHERE operation='DDL';

<DDL_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr  http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
  <source_database_name>NIREUS</source_database_name>
  <command_type>TRUNCATE TABLE</command_type>
  <current_schema>HR</current_schema>
  <ddl_text>TRUNCATE TABLE emp </ddl_text>
  <object_type>TABLE</object_type>
  <object_owner>HR</object_owner>
  <object_name>EMP</object_name>
  <logon_user>HR</logon_user>
  <base_table_owner>HR</base_table_owner>
  <base_table_name>EMP</base_table_name>
  <transaction_id>1.12.1913</transaction_id>
  <scn>4549459</scn>
</DDL_LCR>

SET LINES 80
SET PAGES 100
SELECT lcr_sql FROM strmadmin.activity_logger WHERE operation='UPDATE' AND rownum < 2;

LCR_SQL
--------------------------------------------------------------------------------
 UPDATE "HR"."EMP" SET "SALARY"=2750 WHERE "EMPLOYEE_ID"=182
 AND "SALARY"=2500

okay, looks good, all needed information is included.

And how to get it to the other systems: write a program, that fetches the data from activity_logger table and sends it to whatever system. See an example for Kafka: streams2Kafka.java in attachment streams2Kafka.zip at the end of this blog post. You would need to enhance the example to make it ready for production: better exception handling, maybe multi-threading or some kind of simultaneous processing, run in an endless loop as a daemon, etc.


Summary: it is possible to get change data of an Oracle database quite easy using Oracle Streams. It is deprecated but does not need any extra license, only Enterprise Edition. GoldenGate would be another option but also brings in some more complexity and license costs, compared to this rather simple setup.
 
Cheers Mathias