dbTalk Databases Forums  

dynamically created cursor doesn't work for parallel pipelined functions

comp.databases.oracle.server comp.databases.oracle.server


Discuss dynamically created cursor doesn't work for parallel pipelined functions in the comp.databases.oracle.server forum.



Reply
 
Thread Tools Display Modes
  #1  
Old   
Frank Bergemann
 
Posts: n/a

Default dynamically created cursor doesn't work for parallel pipelined functions - 12-03-2010 , 03:05 PM






=======================
please see this code sample:
=======================

drop table parallel_test;

CREATE TABLE parallel_test (
id NUMBER(10),
description VARCHAR2(50)
);

BEGIN
FOR i IN 1 .. 50000 LOOP
INSERT INTO parallel_test (id, description)
VALUES (i, 'Description or ' || i);
END LOOP;
COMMIT;
END;
/

-- Define a strongly typed REF CURSOR type.

CREATE OR REPLACE PACKAGE parallel_ptf_api AS

TYPE t_parallel_test_row IS RECORD (
id1 NUMBER(10),
desc1 VARCHAR2(50),
id2 NUMBER(10),
desc2 VARCHAR2(50),
sid NUMBER
);

TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;

TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN
t_parallel_test_row;

FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any);

END parallel_ptf_api;
/
SHOW ERRORS

CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS

FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any)
IS
l_row t_parallel_test_row;
BEGIN
LOOP
FETCH p_cursor
INTO l_row;
EXIT WHEN p_cursor%NOTFOUND;

select userenv('SID') into l_row.sid from dual;

PIPE ROW (l_row);
END LOOP;
RETURN;
END test_ptf;

END parallel_ptf_api;
/
SHOW ERRORS

PROMPT
PROMPT Serial Execution
PROMPT ================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;

PROMPT
PROMPT Parallel Execution
PROMPT ==================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+
parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;

PROMPT
PROMPT Parallel Execution 2
PROMPT ==================

set serveroutput on;

declare
v_sids ton := ton();
v_counts ton := ton();
-- v_cur parallel_ptf_api.t_parallel_test_ref_cursor;
v_cur sys_refcursor;

procedure OpenCursor(p_refCursor out sys_refcursor)
is
begin
open p_refCursor for 'SELECT /*+ parallel(t1,5) */ t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id';
end;

begin

OpenCursor(v_cur);

SELECT sid, count(*) bulk collect into v_sids, v_counts
FROM TABLE(parallel_ptf_api.test_ptf(v_cur)) t2
GROUP BY sid;

for i in v_sids.FIRST.. v_sids.LAST loop
dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
end loop;
end;
/


==============
The output is:
==============

SQL> @test

Table dropped.


Table created.


PL/SQL procedure successfully completed.


Package created.

No errors.

Package body created.

No errors.

Serial Execution
================

SID COUNT(*)
---------- ----------
649 50000


Parallel Execution
==================

SID COUNT(*)
---------- ----------
435 10012
440 9970
537 9970
459 9924
371 10124


Parallel Execution 2
==================
649, 50000

PL/SQL procedure successfully completed.

===============================================

But why doesn't it use multiple SIDs for 'Parallel Execution 2'?
Is there any way to make it work?

- many thanks!

Frank

Reply With Quote
  #2  
Old   
Frank Bergemann
 
Posts: n/a

Default Re: dynamically created cursor doesn't work for parallel pipelined functions - 12-03-2010 , 03:07 PM






i forgot to tell:
The code example is modelled on
http://www.dba-oracle.com/plsql/t_pl..._functions.htm

rgds,
Frank

Reply With Quote
  #3  
Old   
Frank Bergemann
 
Posts: n/a

Default Re: dynamically created cursor doesn't work for parallel pipelined functions - 12-04-2010 , 08:48 AM



================================================== ====
a solution to start with - but i have to get around some dependencies:
================================================== ====

drop table parallel_test;

drop type MyDoit;

drop type BaseDoit;

CREATE TABLE parallel_test (
id NUMBER(10),
description VARCHAR2(50)
);

BEGIN
FOR i IN 1 .. 50000 LOOP
INSERT INTO parallel_test (id, description)
VALUES (i, 'Description or ' || i);
END LOOP;
COMMIT;
END;
/


create or replace type BaseDoit as object (
id number,
member procedure doit(
p_sids in out nocopy ton,
p_counts in out nocopy ton)
) not final;
/

create or replace type body BaseDoit as
member procedure doit(
p_sids in out nocopy ton,
p_counts in out nocopy ton)
is
begin
dbms_output.put_line('BaseDoit.doit() invoked');
end;
end;
/

-- Define a strongly typed REF CURSOR type.

CREATE OR REPLACE PACKAGE parallel_ptf_api AS

TYPE t_parallel_test_row IS RECORD (
id1 NUMBER(10),
desc1 VARCHAR2(50),
id2 NUMBER(10),
desc2 VARCHAR2(50),
sid NUMBER
);

TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;

TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN
t_parallel_test_row;

FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any);

END parallel_ptf_api;
/
SHOW ERRORS

CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS

FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any)
IS
l_row t_parallel_test_row;
BEGIN
LOOP
FETCH p_cursor
INTO l_row;
EXIT WHEN p_cursor%NOTFOUND;

select userenv('SID') into l_row.sid from dual;

PIPE ROW (l_row);
END LOOP;
RETURN;
END test_ptf;

END parallel_ptf_api;
/
SHOW ERRORS

PROMPT
PROMPT Serial Execution
PROMPT ================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;

PROMPT
PROMPT Parallel Execution
PROMPT ==================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+
parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;

PROMPT
PROMPT Parallel Execution 2
PROMPT ==================

set serveroutput on;

declare
v_sids ton := ton();
v_counts ton := ton();
-- v_cur parallel_ptf_api.t_parallel_test_ref_cursor;
v_cur sys_refcursor;

procedure OpenCursor(p_refCursor out sys_refcursor)
is
begin
open p_refCursor for 'SELECT /*+ parallel(t1,5) */ t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id';
end;

begin

OpenCursor(v_cur);

SELECT sid, count(*) bulk collect into v_sids, v_counts
FROM TABLE(parallel_ptf_api.test_ptf(v_cur)) t2
GROUP BY sid;

for i in v_sids.FIRST.. v_sids.LAST loop
dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
end loop;
end;
/

PROMPT
PROMPT Parallel Execution 3
PROMPT ==================

set serveroutput on;

declare

procedure CreateMyDoit
is
cmd varchar2(4096 char);
begin
cmd := 'create or replace type MyDoit under BaseDoit ( ' ||
' overriding member procedure doit( ' ||
' p_sids in out nocopy ton, ' ||
' p_counts in out nocopy ton) ' ||
' )';
execute immediate cmd;

cmd := 'create or replace type body MyDoit as ' ||
' overriding member procedure doit( ' ||
' p_sids in out nocopy ton, ' ||
' p_counts in out nocopy ton) ' ||
' is ' ||
' begin ' ||
' dbms_output.put_line(''MyDoit.doit() invoked''); ' ||

' SELECT sid, count(*) bulk collect into p_sids, p_counts ' ||
' FROM TABLE(parallel_ptf_api.test_ptf(CURSOR( ' ||
' SELECT /*+ parallel(t1,5) */ t1.id, t1.description,
t2.id, t2.description, null ' ||
' FROM parallel_test t1, parallel_test t2 ' ||
' where t1.id = t2.id ' ||
' ))) ' ||
' GROUP BY sid; ' ||


' end; ' ||
' end; ';
execute immediate cmd;

end;

begin

CreateMyDoit;

end;
/

declare
v_sids ton := ton();
v_counts ton := ton();

instance BaseDoit;
begin

instance := MyDoit(1);

instance.doit(v_sids, v_counts);

for i in v_sids.FIRST.. v_sids.LAST loop
dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
end loop;

end;
/

==============
The output is:
==============

SQL> @test

Table dropped.


Type dropped.


Type dropped.


Table created.


PL/SQL procedure successfully completed.


Type created.


Type body created.


Package created.

No errors.

Package body created.

No errors.

Serial Execution
================

SID COUNT(*)
---------- ----------
649 50000


Parallel Execution
==================

SID COUNT(*)
---------- ----------
457 10124
390 10012
555 9970
389 9924
603 9970


Parallel Execution 2
==================
649, 50000

PL/SQL procedure successfully completed.


Parallel Execution 3
==================

PL/SQL procedure successfully completed.

MyDoit.doit() invoked
468, 9970
483, 9924
389, 10012
368, 10124
341, 9970

PL/SQL procedure successfully completed.

SQL>

Reply With Quote
  #4  
Old   
Frank Bergemann
 
Posts: n/a

Default Re: dynamically created cursor doesn't work for parallel pipelined functions - 12-04-2010 , 01:12 PM



===============================
wow, i am surprised, what's all possible:
===============================

drop table parallel_test;

drop type MyDoit;

drop type BaseDoit;

CREATE TABLE parallel_test (
id NUMBER(10),
description VARCHAR2(50)
);

BEGIN
FOR i IN 1 .. 100000 LOOP
INSERT INTO parallel_test (id, description)
VALUES (i, 'Description or ' || i);
END LOOP;
COMMIT;
END;
/


create or replace type BaseDoit as object (
id number,
static function make(p_id in number)
return BaseDoit,
member procedure doit(
p_sids in out nocopy ton,
p_counts in out nocopy ton)
) not final;
/

create or replace type body BaseDoit as
static function make(p_id in number)
return BaseDoit
is
begin
return new BaseDoit(p_id);
end;

member procedure doit(
p_sids in out nocopy ton,
p_counts in out nocopy ton)
is
begin
dbms_output.put_line('BaseDoit.doit(' || id || ') invoked...');
end;
end;
/

-- Define a strongly typed REF CURSOR type.

CREATE OR REPLACE PACKAGE parallel_ptf_api AS

TYPE t_parallel_test_row IS RECORD (
id1 NUMBER(10),
desc1 VARCHAR2(50),
id2 NUMBER(10),
desc2 VARCHAR2(50),
sid NUMBER
);

TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;

TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN
t_parallel_test_row;

FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any);

END parallel_ptf_api;
/
SHOW ERRORS

CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS

FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any)
IS
l_row t_parallel_test_row;
BEGIN
LOOP
FETCH p_cursor
INTO l_row;
EXIT WHEN p_cursor%NOTFOUND;

select userenv('SID') into l_row.sid from dual;

PIPE ROW (l_row);
END LOOP;
RETURN;
END test_ptf;

END parallel_ptf_api;
/
SHOW ERRORS

PROMPT
PROMPT Serial Execution
PROMPT ================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;

PROMPT
PROMPT Parallel Execution
PROMPT ==================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+
parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;

PROMPT
PROMPT Parallel Execution 2
PROMPT ==================

set serveroutput on;

declare
v_sids ton := ton();
v_counts ton := ton();
-- v_cur parallel_ptf_api.t_parallel_test_ref_cursor;
v_cur sys_refcursor;

procedure OpenCursor(p_refCursor out sys_refcursor)
is
begin
open p_refCursor for 'SELECT /*+ parallel(t1,5) */ t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id';
end;

begin

OpenCursor(v_cur);

SELECT sid, count(*) bulk collect into v_sids, v_counts
FROM TABLE(parallel_ptf_api.test_ptf(v_cur)) t2
GROUP BY sid;

for i in v_sids.FIRST.. v_sids.LAST loop
dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
end loop;
end;
/

PROMPT
PROMPT Parallel Execution 3
PROMPT ==================

set serveroutput on;

declare

instance BaseDoit;
v_sids ton := ton();
v_counts ton := ton();

procedure CreateMyDoit
is
cmd varchar2(4096 char);
begin
cmd := 'create or replace type MyDoit under BaseDoit ( ' ||
' static function make(p_id in number) ' ||
' return MyDoit, ' ||
' overriding member procedure doit( ' ||
' p_sids in out nocopy ton, ' ||
' p_counts in out nocopy ton) ' ||
' )';
execute immediate cmd;

cmd := 'create or replace type body MyDoit as ' ||
' static function make(p_id in number) ' ||
' return MyDoit ' ||
' is ' ||
' begin ' ||
' return new MyDoit(p_id); ' ||
' end; ' ||
' ' ||
' overriding member procedure doit( ' ||
' p_sids in out nocopy ton, ' ||
' p_counts in out nocopy ton) ' ||
' is ' ||
' begin ' ||
' dbms_output.put_line(''MyDoit.doit('' || id || '')
invoked...''); ' ||

' SELECT sid, count(*) bulk collect into p_sids, p_counts ' ||
' FROM TABLE(parallel_ptf_api.test_ptf(CURSOR( ' ||
' SELECT /*+ parallel(t1,5) */ t1.id, t1.description,
t2.id, t2.description, null ' ||
' FROM parallel_test t1, parallel_test t2 ' ||
' where t1.id = t2.id ' ||
' ))) ' ||
' GROUP BY sid; ' ||


' end; ' ||
' end; ';
execute immediate cmd;

end;

begin

CreateMyDoit;
execute immediate 'select MyDoit.Make(11) from dual' into instance;
instance.doit(v_sids, v_counts);

if v_sids.COUNT > 0 then
for i in v_sids.FIRST.. v_sids.LAST loop
dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
end loop;
end if;

end;
/

==============
The output is:
==============

SQL> @test

Table dropped.


Type dropped.


Type dropped.


Table created.


PL/SQL procedure successfully completed.


Type created.


Type body created.


Package created.

No errors.

Package body created.

No errors.

Serial Execution
================

SID COUNT(*)
---------- ----------
400 100000


Parallel Execution
==================

SID COUNT(*)
---------- ----------
429 20130
458 20035
547 20040
594 19886
390 19909


Parallel Execution 2
==================
400, 100000

PL/SQL procedure successfully completed.


Parallel Execution 3
==================
MyDoit.doit(11) invoked...
429, 19886
390, 20130
368, 20035
519, 19909
458, 20040

PL/SQL procedure successfully completed.

SQL>

Reply With Quote
Reply




Thread Tools
Display Modes

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

vB code is On
Smilies are On
[IMG] code is On
HTML code is Off



Powered by vBulletin Version 3.5.3
Copyright ©2000 - 2012, Jelsoft Enterprises Ltd.