Using an aggregation function to query a JSON-string straight from SQL

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

Last week I read this blogpost by Scott Wesley. In this post he describes that he uses a custom aggregate function to create large JSON-strings.
And for that he used a solution as described in this post by Carsten Czarski. That post of Scott reminded me of a post by my collegue Lucas Jellema, in which he uses the “normal” listagg aggregation function. When Lucas wrote his post I thought that I could beat the 4000 char limit of his aproach with a custom aggregate function.

I started out with “Tom Kytes stragg_type”, see here, just changed the type of the stragg_type attribute string to clob and the return-type of the functions to clob.

That worked, no problem to aggregate strings of size 4000, no problem for size 10000, but for larger strings it became slower and slower.
Too slow, 15000 bytes took 15 seconds.

So I changed the type back to varchar2, but with a size of varchar2(32767).
The worked, and fast. But only for strings shorter than 29989 bytes. For larger strings I would get a
ORA-22813: operand value exceeds system limits

To solve that I added a clob attribute, just as Carsten Czarski does in his listagg.
Used the varchar2 string for speed, and as soon as it result became to large, the clob for size.
And that worked too. But as soon as the aggregated string size exceeded 58894 bytes the ORA-22813 popped up again.
And as soon as the odciaggregatemerge function used the clob another error: ORA-22922: nonexistent LOB value
So I gave up, 4000 bytes is a nice limit for JSON, if you want something bigger you have to use PL/SQL. So I thought.

But after reading the post of Scott I compared my code with the code of Carsten Czarski to see how he solved my problems.
And it turned out that the first one was easy to solve, just limit the string to 4000 again.
And Carsten’s odciaggregatemerge function will raise a ORA-22922 too. I expect that it is an Oracle bug :)
But, because the odciaggregatemerge function is only executed if the optimizer decides that it will execute the aggregating query parallel, you aggregate very large strings without ever seeing that error.

So, now it’s time to introduce my JSON aggregator. It’s a custom aggregate function, which aggregates a query into a JSON-array. The elements of this array are JSON-objects.

create or replace type agg_json as object
( t_varchar2 varchar2(32767)
, t_clob clob
, static function odciaggregateinitialize( sctx in out agg_json )
  return number
, member function odciaggregateiterate
    ( self in out agg_json
    , a_val dbmsoutput_linesarray
    )
  return number
, member function odciaggregateterminate
    ( self in out agg_json
    , returnvalue out clob
    , flags in number
    )
  return number
, member function odciaggregatemerge
    ( self in out agg_json
    , ctx2 in out agg_json
    )
  return number
, static function json_obj( p_obj dbmsoutput_linesarray )
  return varchar2
, static function escape_json( p_val varchar2 )
  return varchar2
)

Just a type with two attributes, the standard functions for implementing a custom aggregation function, and two supporting static functions.
But notice the a_val parameter of odciaggregateiterate. dbmsoutput_linesarray is a varray of varchar2(32767).
Every name-value pair in the JSON-Object is formed by 3 entries in that varray.
The first entry is the name of the name-value pair.
The second entry is the value of the name-value pair.
And the third is a indicator for the value, is it a string or not.
The fourth entry is the name of the second name-value pair.
The fifth entry is the value of the second name-value pair.

After creating the aggregation function you can create JSON

create or replace function json_agg( agg dbmsoutput_linesarray )
return clob
parallel_enable aggregate using agg_json;

For example, this query

select json_agg( dbmsoutput_linesarray( 'id', level, 'n'
                                      , 'name', level, '' 
                                      , 'test', 'test' || level, ''
                                      )
               ) 
from dual
connect by level <= 3

produces this JSON

 [{"id":1,"name":"1","test":"test1"}
,{"id":2,"name":"2","test":"test2"}
,{"id":3,"name":"3","test":"test3"}]

And to get the JSON from Lucas example nest two calls to this new aggregation function

select agg_json.json_obj
         ( dbmsoutput_linesarray( 'company'
                                , json_agg( dbmsoutput_linesarray( 'name', d.dname, ''
                                                                 , 'identifier', d.deptno, '' 
                                                                 , 'location', d.loc, '' 
                                                                 , 'employees', json_agg( dbmsoutput_linesarray( 'name', e.ename, ''
                                                                                                               , 'job', e.job, ''
                                                                                                               , 'salary', e.sal, 'n'
                                                                                                               , 'manager', nvl( ( select agg_json.json_obj( dbmsoutput_linesarray( 'name', m.ename, ''
                                                                                                                                                                                  , 'salary', m.sal, 'n'
                                                                                                                                                                                  , 'hiredate', to_char( m.hiredate, 'DD-MM-YYYY' ), ''
                                                                                                                                                                                  )
                                                                                                                                                           )
                                                                                                                                   from emp m
                                                                                                                                   where m.empno = e.mgr
                                                                                                                                 ), '{}' ), 'o'
                                                                                                               , 'hiredate', to_char( e.hiredate, 'DD-MM-YYYY' ), '' 
                                                                                                               ) 
                                                                                        ), 'a'
                                          )
                                          )
                                , 'a'
                                )
         )
from dept d
   , emp e
where d.deptno = e.deptno
group by d.dname, d.deptno, d.loc

Here is the code.

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

FTPS with PL/SQL

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

Doing a FTP-job with PL/SQL is not difficult.
A basic implementation of RFC 959 can be written in a few hundred lines. See for instance ORACLE-BASE, How to FTP with Oracle PL/SQL or Oracle FAQ’s
But what if you want to secure your FTP transmission. Google doesn’t find any pure PL/SQL solutions, only java solutions with a PL/SQL wrapper.
Securing FTP transmission can be done in two ways: SFTP and FTPS.
SFTP stands for SSH File Transfer Protocol. This protocol is not based on the plain FTP protocol, implementing this in PL/SQL is not more difficult, it’s just a lot more work. You will not find anything about SFTP in this posting, maybe I will talk about that later.
FTPS stands for FTP Secure, RFC 4217 – Securing FTP with TLS. And it’s secured with the TLS protocol. Oracle has added support for this protocol to utl_tcp in version 11.2.0.2, UTP_TCP.
As with https you need a certificate (chain) of the server in a Oracle Wallet.
You have pass the path of this wallet to the utl_tcp.open_connection procedure

t_conn := utl_tcp.open_connection( p_host
, p_port
, wallet_path => p_wallet_path
, wallet_password => p_wallet_password
); -- open connection

And you tell the server that you want to secure the connection

write_cmd( t_conn, 'AUTH', 'TLS' )

If the server is OK with that, do it.

utl_tcp.secure_connection( t_conn );

That’s enough to secure the control channel of the FTP connection!

Doing the same for the data channel requires just two more commands

write_cmd( t_conn, 'PBSZ', '0' )
write_cmd( t_conn, 'PROT', 'P' )

A complete example:

declare
  t_conn utl_tcp.connection;
  t_blob blob;
  t_clob clob;
  t_reply number;
  type tp_dir_listing is table of varchar2(32767) index by pls_integer;
  t_dir_listing tp_dir_listing;
--
  g_response varchar2(1000 char);
  g_wallet_path     varchar2(1000 char);
  g_wallet_password varchar2(1000 char);
--
  function read_reply( p_conn in out nocopy utl_tcp.connection, p_verbose boolean := true )
  return number
  is
    t_line varchar(32767);
    t_code varchar2(3 char);
  begin
    t_line := utl_tcp.get_line( p_conn, true );
    if nvl( length( t_line ), 0 )  0
       and instr( t_tmp, '(', 1, 2 ) = 0
       and instr( t_tmp, ')' ) > 0
       and instr( t_tmp, ')', 1, 2 ) = 0
       )
    then
      t_tmp := substr( t_tmp, instr( t_tmp, '(' ) + 1 ); 
      t_tmp := substr( t_tmp, 1, instr( t_tmp, ')' ) - 1 );
    else
      t_tmp := translate( t_tmp, '1234567890,' || t_tmp, '1234567890,' );
    end if;      
    t_host := substr( t_tmp, 1, instr( t_tmp, ',', 1, 4 ) - 1 ); 
    t_host := replace( t_host, ',', '.' );
    t_tmp := substr( t_tmp, instr( t_tmp, ',', 1, 4 ) + 1 );
    t_port := to_number( substr( t_tmp, 1, instr( t_tmp, ',' ) - 1 ) );
    t_port := t_port * 256;
    t_port := t_port + to_number( substr( t_tmp, instr( t_tmp, ',' ) + 1 ) );
    t_conn := utl_tcp.open_connection( t_host, t_port, wallet_path => g_wallet_path, wallet_password => g_wallet_password );
    return t_conn; 
  end; 
--
  function write_cmd( p_conn in out nocopy utl_tcp.connection, p_cmd varchar2, p_param varchar2 := '' )
  return number
  is
    t_dummy pls_integer;
  begin
    t_dummy := utl_tcp.write_line( p_conn, p_cmd || rtrim( ' ' || p_param ) );
    return read_reply( p_conn );
  end;
--
  function open_data_channel( p_conn in out nocopy utl_tcp.connection )
  return utl_tcp.connection
  is
    t_tmp varchar2(32767); 
    t_host varchar2(1000 char); 
    t_port pls_integer; 
    t_conn utl_tcp.connection;
  begin
    if write_cmd( p_conn, 'PASV' ) != 227
    then
      raise_application_error( -20001, 'Could not set PASSIVE: ' || g_response, true );
    end if;
    t_tmp := g_response;
    if (   instr( t_tmp, '(' ) > 0
       and instr( t_tmp, '(', 1, 2 ) = 0
       and instr( t_tmp, ')' ) > 0
       and instr( t_tmp, ')', 1, 2 ) = 0
       )
    then
      t_tmp := substr( t_tmp, instr( t_tmp, '(' ) + 1 ); 
      t_tmp := substr( t_tmp, 1, instr( t_tmp, ')' ) - 1 );
    else
      t_tmp := translate( t_tmp, '1234567890,' || t_tmp, '1234567890,' );
    end if;      
    t_host := substr( t_tmp, 1, instr( t_tmp, ',', 1, 4 ) - 1 ); 
    t_host := replace( t_host, ',', '.' );
    t_tmp := substr( t_tmp, instr( t_tmp, ',', 1, 4 ) + 1 );
    t_port := to_number( substr( t_tmp, 1, instr( t_tmp, ',' ) - 1 ) );
    t_port := t_port * 256;
    t_port := t_port + to_number( substr( t_tmp, instr( t_tmp, ',' ) + 1 ) );
    t_conn := utl_tcp.open_connection( t_host, t_port, wallet_path => g_wallet_path, wallet_password => g_wallet_password );
    return t_conn; 
  end; 
--
  function login
    ( p_host varchar2
    , p_usr  varchar2 := null
    , p_pw   varchar2 := null
    , p_port number := 21
    , p_account varchar2 := null
    , p_wallet_path varchar2 := null
    , p_wallet_password varchar2 := null
    )
  return utl_tcp.connection
  is
    t_reply number;
    t_conn utl_tcp.connection;
    t_usr varchar2(1000 char);
    t_pw  varchar2(1000 char);
    t_acc varchar2(1000 char);
  begin
    t_conn := utl_tcp.open_connection( p_host, p_port, wallet_path => p_wallet_path, wallet_password => p_wallet_password ); -- open connection
    case read_reply( t_conn ) 
      when 120
      then
        raise_application_error( -20001, 'FTP server not ready: ' || g_response, true );
      when 421
      then
        raise_application_error( -20001, 'FTP server not available: ' || g_response, true );
      else
        null;
    end case;
    if write_cmd( t_conn, 'AUTH', 'TLS' ) = 234 
    then
      utl_tcp.secure_connection( t_conn );
      if     write_cmd( t_conn, 'PBSZ', '0' ) = 200  
         and write_cmd( t_conn, 'PROT', 'P' ) = 200
      then
        g_wallet_path     := p_wallet_path;
        g_wallet_password := p_wallet_password;
      end if;
    end if;
    if p_usr is null
    then
      t_usr := 'anonymous';
      t_pw  := 'anonymous@mysite.com';  
      t_acc := '*********';
    else  
      t_usr := p_usr;
      t_pw  := p_pw;  
      t_acc := p_account;
    end if;
    t_reply := write_cmd( t_conn, 'USER', t_usr );  
    if t_reply = 331
    then
      t_reply := write_cmd( t_conn, 'PASS', t_pw );  
    end if;  
    if t_reply = 332
    then
      t_reply := write_cmd( t_conn, 'ACCT', t_acc );  
    end if;  
    if t_reply not in ( 230, 202 )
    then
      raise_application_error( -20001, 'Could not log in: ' || g_response, true );
    end if;
    return t_conn;
  end;
--
  procedure get_file
    ( p_conn in out nocopy utl_tcp.connection
    , p_path in varchar2
    , p_file in out blob
    )
  is
    t_reply number;
    t_buf raw(32767);
    t_cnt pls_integer;
    t_conn utl_tcp.connection;
  begin
    t_reply := write_cmd( p_conn, 'TYPE', 'I' );    
    t_conn := open_data_channel( p_conn );
    t_reply := write_cmd( p_conn, 'RETR', p_path );    
    if t_reply in ( 125, 150 )
    then
      if g_wallet_path is not null
      then
        utl_tcp.secure_connection( t_conn );
      end if;
      dbms_lob.createtemporary( p_file, true );
      begin 
        loop
          t_cnt := utl_tcp.read_raw( t_conn, t_buf, 32767 );
          dbms_lob.writeappend( p_file, t_cnt, t_buf );
        end loop;
      exception
        when utl_tcp.end_of_input
        then
          null;
      end;
      utl_tcp.close_connection( t_conn );
      t_reply := read_reply( p_conn );
    end if;
    if t_reply not in ( 226, 250 )
    then
      raise_application_error( -20001, 'Could not retrieve file: ' || g_response, true );
    end if;
  end;
--
  procedure get_file
    ( p_conn in out nocopy utl_tcp.connection
    , p_path in varchar2
    , p_file in out clob
    )
  is
    t_reply number;
    t_buf varchar2(32767);
    t_cnt pls_integer;
    t_conn utl_tcp.connection;
  begin
    t_reply := write_cmd( p_conn, 'TYPE', 'A' );    
    t_conn := open_data_channel( p_conn );
    t_reply := write_cmd( p_conn, 'RETR', p_path );    
    if t_reply in ( 125, 150 )
    then
      if g_wallet_path is not null
      then
        utl_tcp.secure_connection( t_conn );
      end if;
      dbms_lob.createtemporary( p_file, true );
      begin 
        loop
          t_buf := utl_i18n.raw_to_char( utl_tcp.get_raw( t_conn, 32767 ), 'US7ASCII' );
          dbms_lob.writeappend( p_file, length( t_buf ), t_buf );
        end loop;
      exception
        when utl_tcp.end_of_input
        then
          null;
      end;
      utl_tcp.close_connection( t_conn );
      t_reply := read_reply( p_conn );
    end if;
    if t_reply not in ( 226, 250 )
    then
      raise_application_error( -20001, 'Could not retrieve file: ' || g_response, true );
    end if;
  end;
--
  procedure nlst
    ( p_conn in out nocopy utl_tcp.connection
    , p_dir_listing in out tp_dir_listing
    , p_path in varchar2 := null
    )
  is
    t_reply number;
    t_conn utl_tcp.connection;
  begin
    p_dir_listing.delete;
    t_reply := write_cmd( p_conn, 'TYPE', 'A' );    
    t_conn := open_data_channel( p_conn );
    t_reply := write_cmd( p_conn, 'NLST', p_path );    
    if t_reply in ( 125, 150 )
    then
      if g_wallet_path is not null
      then
        utl_tcp.secure_connection( t_conn );
      end if;
      begin 
        loop
          p_dir_listing( p_dir_listing.count + 1 ) := utl_tcp.get_line( t_conn, true ); 
        end loop;
      exception
        when utl_tcp.end_of_input
        then
          null;
      end;
      utl_tcp.close_connection( t_conn );
      t_reply := read_reply( p_conn );
    end if;
    if t_reply != 226
    then
      raise_application_error( -20001, 'Could not get NLST: ' || g_response, true );
    end if;
  end;
--
begin
  t_conn := login( 'localhost', p_wallet_path => 'file:C:\oracle\wallet', p_wallet_password => 'amis.rules.again' );
--  get_file( t_conn, '/my_dir/my_ascii_file.txt', t_clob );  
  nlst( t_conn, t_dir_listing, '*.txt' );  
  t_reply := write_cmd( t_conn, 'QUIT' );
  utl_tcp.close_connection( t_conn );
end;

You will find the same example here

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

Select a blob across a database link, without getting ORA-22992

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page

Just a quick blog about a simple trick to select a blob across a database link, especially for a collegue of mine, Harry Dragstra.

Say, you have a table with a blob on a remote database

SQL> describe test_blob@xx
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 ID                                                 NUMBER
 BLB                                                BLOB

When you use a normal select statement to get all columns you run into an error
Continue reading

Share this on .. Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Email this to someoneShare on TumblrBuffer this page