====== Oracle 12c - Datapump Export mit PL/SQL - Import/Export aus der Datenbank starten ======
DB: 11g / 12g / 18c
**Aufgabe:** Ein Schema soll zwischen zwei Datenbanken regelmäßig "kopiert"/"abgeglichen" werden.
**Lösung:**
PL/SQL Code steuert DataPump direkt aus der Datenbank heraus, damit ist kein Zugriff auf das Betriebssystem für den User notwendig.
Das Kopieren erfolgt dabei über einen Datenbank Link.
Kompletter Source Code in der aktuellesten Version siehe hier => https://github.com/gpipperr/OraPowerShell/tree/master/Ora_SQLPlus_SQLcL_sql_scripts/datapump
----
==== DataPump über eine DB Link aufrufen====
=== Datapump Import über das Netz ====
Über einen DB Link aus dem Zielsystem das Quellsystem abfragen und dort die Daten einfügen:
|--------| |--------|
| GPITST | <==== DB Link | GPIPRD |
|--------| |--------|
source destination
Der Export erfolgt von der "source database" in die "destination database", d.h. der DB Link wird in der "destination database" auf die "source database" angelegt!.
Zu Datapump siehe auch => [[dba:datapump_import|Oracle Data Pump Schema Export und Import]]
----
==== Umsetzung ====
=== Test Schemas anlegen ===
**Source** DB **GPITEST**
create user BESTDBA identified by "xxxxxxxx";
grant connect, resource to BESTDBA
grant DATAPUMP_EXP_FULL_DATABASE to BESTDBA
connect BESTDBA/"xxxxxxxx"
-- test daten erzeugen
SQL> create table t_all_objects as select * from all_objects;
Table created.
SQL> select count(*) from t_all_objects;
COUNT(*)
----------
86820
**Target/Destination** DB **GPIPROD**
create user BESTDBA identified by "xxxxxxx";
grant connect, resource to BESTDBA;
grant DATAPUMP_IMP_FULL_DATABASE to BESTDBA;
grant create table, create procedure to bestdba;
CREATE directory BACKUP AS "/opt/oracle/acfs/import";
GRANT READ,WRITE ON directory BACKUP TO BESTDBA;
connect BESTDBA/"xxxxxxxx"
CREATE DATABASE LINK DP_TRANSFER CONNECT TO BESTDBA IDENTIFIED BY "xxxxxxx" USING 'GPITSTDB';
SQL> select global_name from global_name@DP_TRANSFER;
GLOBAL_NAME
--------------------------------------------------------------------------------
GPITST
Bzw. bestehenden User wie folgt anpassen!
----
=== User Rechte vergeben===
Rollen für den Export/Import der Datenbank vergeben:
* DATAPUMP_EXP_FULL_DATABASE
* DATAPUMP_IMP_FULL_DATABASE
Auf Source Database:
grant DATAPUMP_EXP_FULL_DATABASE to BESTDBA;
Auf Target Database:
grant DATAPUMP_IMP_FULL_DATABASE to BESTDBA;
grant create table, create procedure to bestdba;
----
=== DB Link von der „destination/Target database“ auf die Source anlegen ===
Der Export erfolgt von der „source database“ in die „destination database“, d.h. der DB Link wird in der „destination database“ auf die „source database“ angelegt!.
Anlegen nach folgenden Muster unter dem User der auch den Export durchführen soll:
CREATE DATABASE LINK mylink CONNECT TO remote_user IDENTIFIED BY remote_pwd USING 'remote_db';
Ist das Password des Users nicht bekannt den DB Link anlegen über eine Hilfsfunktion anlegen: [[dba:create_db_link_other_schema|Einen DB Link in einem anderem Schema anlegen]]
In einer Oracle Cluster Umgebung darauf achten,das der TNS alias auch in der TNSNAMES.ora des Clusters ( wie User "Grid") steht!
----
=== DB Directory für das Log angelegen ===
Damit wir später auch das Log lesen können importieren wir das Log als external Table.
Directory anlegen (im Cluster darauf achten, das es auch vom Grid User geschrieben werden kann und von beiden Seiten des Clusters erreicht wird!) :
create directory import as "/opt/oracle/acfs/import";
grant read,write on directory import to bestdba;
Rechte an dem Directory dem User geben!
----
==== PL/SQL Code für den Start von DataPump -das komplette Schema importieren ====
Mit diesem Code kann dann ein ganzes Schema zu synchronisieren:
CREATE OR REPLACE PROCEDURE dp_import_user_schema
IS
v_dp_handle NUMBER;
PRAGMA AUTONOMOUS_TRANSACTION;
v_db_directory varchar2(200):='BACKUP';
v_db_link varchar2(200):='DP_TRANSFER';
v_job_name varchar2(256):=user ||'_IMPORT' || TO_CHAR (SYSDATE, 'DD_HH24');
--v_log_file_name varchar2(256):=user||'_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '.log';
v_log_file_name varchar2(256):='db_import_plsql.log';
BEGIN
dbms_output.put_line(' -- Import Parameter ------------' );
dbms_output.put_line(' -- DB Link :: '|| v_db_link );
dbms_output.put_line(' -- DB DIRECTORY :: '|| v_db_directory);
dbms_output.put_line(' -- DP JOB Name :: '|| v_job_name);
dbms_output.put_line(' -- DP Log File :: '|| v_log_file_name);
-- Create Data Pump Handle - "IMPORT" in this case
v_dp_handle := DBMS_DATAPUMP.open (operation => 'IMPORT'
, job_mode => 'SCHEMA'
, job_name => v_job_name
, remote_link => v_db_link);
-- No PARALLEL
DBMS_DATAPUMP.set_parallel (handle => v_dp_handle, degree => 1);
-- consistent EXPORT
-- Consistent to the start of the export with the timestamp of systimestamp
--
DBMS_DATAPUMP.SET_PARAMETER(
handle => v_dp_handle
, name => 'FLASHBACK_TIME'
, value => 'systimestamp'
);
-- impprt the complete schema Filter
DBMS_DATAPUMP.metadata_filter (handle => v_dp_handle
, name => 'SCHEMA_EXPR'
, VALUE => 'IN ('''||user||''')');
-- Logfile
DBMS_DATAPUMP.add_file (handle => v_dp_handle
,filename => v_log_file_name
,filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE
,directory => v_db_directory
,reusefile => 1 -- overwrite existing files
,filesize => '10000M');
-- Do it!
DBMS_DATAPUMP.start_job (handle => v_dp_handle);
COMMIT;
DBMS_DATAPUMP.detach (handle => v_dp_handle);
END dp_import_user_schema;
/
====Nur eine einzelne Tabelle importieren=====
Code um eine Tabelle Schema zu importieren:
CREATE OR REPLACE PROCEDURE dp_import_table(p_tablename varchar2
, p_mode varchar2)
--- +----------------------------------
--
-- testcall exec db_import_table(p_tablename => 'T_ALL_OBJECTS', p_mode=> 'REPLACE')
--
-- +----------------------------------
IS
v_dp_handle NUMBER;
PRAGMA AUTONOMOUS_TRANSACTION;
v_db_directory varchar2(200):='BACKUP';
v_db_link varchar2(200):='DP_TRANSFER';
v_job_name varchar2(256):=user ||'_IMPORT' || TO_CHAR (SYSDATE, 'DD_HH24');
--v_log_file_name varchar2(256):=user||'_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '.log';
-- use same name to import the data later via external table
v_log_file_name varchar2(256):='db_import_plsql.log';
BEGIN
dbms_output.put_line(' -- Import Parameter ------------' );
dbms_output.put_line(' -- Tablename :: '|| p_tablename );
dbms_output.put_line(' -- Replace Modus :: '|| p_mode);
dbms_output.put_line(' -- DB Link :: '|| v_db_link );
dbms_output.put_line(' -- DB DIRECTORY :: '|| v_db_directory);
dbms_output.put_line(' -- DP JOB Name :: '|| v_job_name);
dbms_output.put_line(' -- DP Log File :: '|| v_log_file_name);
if upper(p_mode) not in ('TRUNCATE', 'REPLACE', 'APPEND', 'SKIP') then
RAISE_APPLICATION_ERROR (-20000, '-- Error :: This Tablemode is not supported ::'||p_mode);
end if;
-- Create Data Pump Handle - "IMPORT" in this case
v_dp_handle := DBMS_DATAPUMP.open (operation => 'IMPORT'
, job_mode => 'TABLE'
, job_name => v_job_name
, remote_link => v_db_link);
-- No PARALLEL
DBMS_DATAPUMP.set_parallel (handle => v_dp_handle, degree => 1);
-- consistent EXPORT
-- Consistent to the start of the export with the timestamp of systimestamp
--
DBMS_DATAPUMP.SET_PARAMETER(
handle => v_dp_handle
, name => 'FLASHBACK_TIME'
, value => 'systimestamp'
);
-- TABLE_EXISTS_ACTION -- : TRUNCATE, REPLACE, APPEND, and SKIP.
DBMS_DATAPUMP.SET_PARAMETER(
handle => v_dp_handle
, name => 'TABLE_EXISTS_ACTION'
, value => upper(p_mode)
);
--import only this table
DBMS_DATAPUMP.metadata_filter ( handle => v_dp_handle
, name => 'NAME_EXPR'
, VALUE => 'IN ('''||p_tablename||''')');
-- impprt from this Schema
DBMS_DATAPUMP.metadata_filter (handle => v_dp_handle
, name => 'SCHEMA_EXPR'
, VALUE => 'IN ('''||user||''')');
-- Logfile
DBMS_DATAPUMP.add_file (handle => v_dp_handle
,filename => v_log_file_name
,filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE
,directory => v_db_directory
,reusefile => 1 -- overwrite existing files
,filesize => '10000M');
-- Do it!
DBMS_DATAPUMP.start_job (handle => v_dp_handle);
COMMIT;
DBMS_DATAPUMP.detach (handle => v_dp_handle);
END dp_import_table;
/
----
==== Probleme bei der Entwicklung ====
=== Problem ORA-31626: job does not exist ===
Beim ersten Aufruf erhalte ich den Fehler "ORA-31626: job does not exist".
Fehler:
ERROR at line 1:
ORA-31626: job does not exist
ORA-06512: at "SYS.DBMS_SYS_ERROR", line 79
ORA-06512: at "SYS.DBMS_DATAPUMP", line 1137
ORA-06512: at "SYS.DBMS_DATAPUMP", line 5285
Rechte?
Lösung:
-- Target DB
grant create table, create procedure to bestdba;
Der User benötigt auch die direkten Grants, eine Rolle ist nicht ausreichend, ist ja PL/SQL im Hintergrund!
siehe auch https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532917900346934390
=== Problem ORA-31626: job does not exist ===
Fehler:
ORA-39001: invalid argument value
ORA-06512: at "SYS.DBMS_SYS_ERROR", line 79
ORA-06512: at "SYS.DBMS_DATAPUMP", line 3507
ORA-06512: at "SYS.DBMS_DATAPUMP", line 5296
Im ersten Schritt den DB Link nochmals prüfen, Name des DB Links im Script falsch vergeben 8-O.
Siehe auch => Error ORA-39001 When Using DBMS_DATAPUMP API Over A Network Link (Doc ID 1160207.1)
===Problem DBMS_DATAPUMP.ATTACH ORA-31626: job does not exist===
Ein Job läßt sich nicht mehr löschen, steht im Status "DEFINING", nun was tun??
Diese bestehende Session beenden und neu anmelden!
Schlägt etwas fehl, ist das Handle blockiert! Einfach abmelden und wieder anmelden und schon ist alles wieder gut!
----
==== Datapump Log File in der DB auslesen ====
Für das Auslese des Logfiles wird eine external Tabelle eingesetzt.
drop table DP_DUMP_LOG;
CREATE TABLE DP_DUMP_LOG (
log_line VARCHAR2(4000)
)
ORGANIZATION EXTERNAL (
TYPE ORACLE_LOADER
DEFAULT DIRECTORY backup
ACCESS PARAMETERS (
RECORDS DELIMITED BY NEWLINE
FIELDS TERMINATED BY ','
MISSING FIELD VALUES ARE NULL
(
log_line CHAR(4000)
)
)
LOCATION ('db_import_plsql.log')
)
PARALLEL 1
REJECT LIMIT UNLIMITED;
Auswerten mit:
select * from DP_DUMP_LOG;
Starting "BESTDBA"."BESTDBA_IMPORT16_09":
Estimate in progress using BLOCKS method...
Processing object type TABLE_EXPORT/TABLE/TABLE_DATA
Total estimation using BLOCKS method: 10 MB
Processing object type TABLE_EXPORT/TABLE/TABLE
. . imported "BESTDBA"."T_ALL_OBJECTS" 86820 rows
Processing object type TABLE_EXPORT/TABLE/STATISTICS/TABLE_STATISTICS
Job "BESTDBA"."BESTDBA_IMPORT16_09" successfully completed at Sat Feb 16 09:24:44 2019 elapsed 0 00:00:03
----
==== Job in der DB wieder abbrechen ====
CREATE OR REPLACE PROCEDURE dp_import_stop_job(p_job_name varchar2)
is
--- +----------------------------------
--
-- testcall exec db_import_stop_job(p_job_name => 'MY_JOB')
--
-- +----------------------------------
v_dp_handle NUMBER;
cursor c_act_jobs is
select job_name
, operation
, job_mode
, state
, attached_sessions
from user_datapump_jobs
where job_name not like 'BIN$%'
order by 1,2
;
v_job_exits boolean:=false;
v_job_mode varchar2(32);
v_job_state varchar2(32);
v_real_job_name varchar2(32);
v_count pls_integer;
v_sts ku$_Status;
v_job_run_state varchar2(2000);
BEGIN
dbms_output.put_line(' -- Stop the Job Parameter ------------' );
dbms_output.put_line(' -- p_job_name :: '|| p_job_name );
-- query all actual jobs
-- to show a list of candidates if job_name is wrong
--
for rec in c_act_jobs
loop
if rec.job_name = upper(p_job_name) then
v_job_exits:=true;
v_real_job_name:=rec.job_name;
v_job_mode:=rec.job_mode;
v_job_state:=rec.state;
else
v_job_exits:=false;
end if;
dbms_output.put_line('--- Found this Job :: ' ||rec.job_name );
dbms_output.put_line('+-- Operation :: ' ||rec.operation );
dbms_output.put_line('+-- Mode :: ' ||rec.job_mode );
dbms_output.put_line('+-- State :: ' ||rec.state );
dbms_output.put_line('+-- Sessions :: ' ||rec.attached_sessions );
end loop;
if v_job_exits then
begin
-- Create Data Pump Handle - "ATTACH" in this case
v_dp_handle := DBMS_DATAPUMP.ATTACH(
job_name => v_real_job_name
,job_owner => user);
exception
when DBMS_DATAPUMP.NO_SUCH_JOB then
-- check if the old job table exits
select count(*) into v_count from user_tables where upper(table_name) = upper(v_real_job_name);
if v_count > 0 then
execute immediate 'drop table '||user||'."'||v_real_job_name||'"';
end if;
RAISE_APPLICATION_ERROR (-20003, '-- Error :: Job Not running anymore, check for other errors - no mastertable for '||p_job_name || ' get Error '||SQLERRM);
when others then
RAISE_APPLICATION_ERROR (-20002, '-- Error :: Not possible to attach to the job - Error :: '||SQLERRM);
end;
if v_job_state in ('DEFINING') then
-- check if the job is in the defining state!
-- abnormal situation, normal stop not possible
-- use DBMS_DATAPUMP.START_JOB to restart the job
DBMS_DATAPUMP.START_JOB ( handle => v_dp_handle );
end if;
-- print the status
dbms_datapump.get_status (handle => v_dp_handle
, mask => dbms_datapump.KU$_STATUS_WIP
, timeout => 0
, job_state => v_job_run_state
, status => v_sts
);
dbms_output.put_line('+-- Akt State :: ' ||v_job_run_state );
-- Stop the job
DBMS_DATAPUMP.STOP_JOB (
handle => v_dp_handle
, immediate => 1 -- stop now
, keep_master => null -- delete Master table
, delay => 5 -- wait 5 seconds before kill for other sessions
);
else
RAISE_APPLICATION_ERROR (-20000, '-- Error :: This job name not found::'||p_job_name);
end if;
end dp_import_stop_job;
/
----
==== Jobs in der DB kontrollieren ====
siehe auch https://github.com/gpipperr/OraPowerShell/blob/master/Ora_SQLPlus_SQLcL_sql_scripts/datapump.sql
--==============================================================================
-- GPI - Gunther Pippèrr
-- Desc: Get Information about running data pump jobs
-- Date: November 2013
--==============================================================================
set linesize 130 pagesize 300
column owner_name format a10;
column job_name format a20
column state format a12
column operation like state
column job_mode like state
ttitle "Datapump Jobs" SKIP 2
select owner_name
, job_name
, operation
, job_mode
, state
, attached_sessions
from dba_datapump_jobs
where job_name not like 'BIN$%'
order by 1,2
/
ttitle "Datapump Master Table" SKIP 2
column status format a10;
column object_id format 99999999
column object_type format a12
column OBJECT_NAME format a25
select o.status
, o.object_id
, o.object_type
, o.owner||'.'||object_name as OBJECT_NAME
from dba_objects o
, dba_datapump_jobs j
where o.owner=j.owner_name
and o.object_name=j.job_name
and j.job_name not like 'BIN$%' order by 4,2
/
ttitle off
prompt ...
prompt ... check for "NOT RUNNING" Jobs
prompt ...
==== Quellen ====
* https://docs.oracle.com/cd/B28359_01/appdev.111/b28419/d_datpmp.htm#i998298
* https://docs.oracle.com/database/121/SUTIL/GUID-5AAC848B-5A2B-4FD1-97ED-D3A048263118.htm#SUTIL977
----
==== Code Beispiel für einen "normalen" DataPump Export Aufruf ====
-- Export GPIDB Database
SET SERVEROUTPUT ON
ACCEPT export_dir CHAR PROMPT 'Enter the name for the directory for the export of the database:'
DECLARE
CURSOR c_dir (P_DIRNAME VARCHAR2)
IS
SELECT DIRECTORY_PATH
FROM dba_directories
WHERE DIRECTORY_NAME = P_DIRNAME;
v_dir dba_directories.DIRECTORY_PATH%TYPE;
BEGIN
DBMS_OUTPUT.put_line ('check for directory GPIDB_EXPORT');
OPEN c_dir ('GPIDB_EXPORT');
FETCH c_dir INTO v_dir;
IF SQL%NOTFOUND
THEN
DBMS_OUTPUT.put_line ('create directory GPIDB_EXPORT');
EXECUTE IMMEDIATE 'create directory GPIDB_export as ''/orabackup''';
ELSE
IF v_dir NOT LIKE '&&export_dir'
THEN
DBMS_OUTPUT.put_line ('relink directory GPIDB_EXPORT');
EXECUTE IMMEDIATE 'drop directory GPIDB_export';
EXECUTE IMMEDIATE 'create directory GPIDB_export as ''/orabackup''';
END IF;
END IF;
CLOSE c_dir;
END;
/
SELECT DIRECTORY_PATH
FROM dba_directories
WHERE DIRECTORY_NAME = 'GPIDB_EXPORT';
--- Start datapump to export the database
CREATE OR REPLACE PROCEDURE db_export_GPIDB
IS
v_dp_handle NUMBER;
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
-- Create Data Pump Handle - "TABLE EXPORT" in this case
v_dp_handle :=
DBMS_DATAPUMP.open (operation => 'EXPORT', job_mode => 'SCHEMA', job_name => 'GPIDB_EXPORT2' || TO_CHAR (SYSDATE, 'DD_HH24'));
DBMS_DATAPUMP.set_parallel (handle => v_dp_handle, degree => 4);
-- Export the complete schema
DBMS_DATAPUMP.metadata_filter (handle => v_dp_handle, name => 'SCHEMA_EXPR', VALUE => 'IN (''GPIDB'')');
-- Specify target file - make it unique with a timestamp
DBMS_DATAPUMP.add_file (handle => v_dp_handle
,filename => 'GPIDB_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '%U.dmp'
,directory => 'GPIDB_EXPORT'
,reusefile => 1 -- overwrite existing files
,filesize => '50000M');
-- Logfile
DBMS_DATAPUMP.add_file (handle => v_dp_handle
,filename => 'GPIDB_' || TO_CHAR (SYSDATE, 'YYYYMMDD-HH24MISS') || '.log'
,filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE
,directory => 'GPIDB_EXPORT'
,reusefile => 1 -- overwrite existing files
,filesize => '10000M');
-- MERGE => that each partitioned table is re-created in the target database as an unpartitioned table
-- DBMS_DATAPUMP.set_parameter (handle => v_dp_handle, name => 'PARTITION_OPTIONS', VALUE => 'MERGE');
-- Do it!
DBMS_DATAPUMP.start_job (handle => v_dp_handle);
COMMIT;
-- DBMS_DATAPUMP.detach (handle => v_dp_handle);
END;
/
BEGIN
DBMS_OUTPUT.put_line (' create export at ' || TO_CHAR (SYSDATE, 'dd.mm HH24:MI'));
db_export_GPIDB;
END;
/
PROMPT "to attach to the job please use:"
SELECT 'expdp "''/ as sysdba''" attach=GPIDB_EXPORT2' || TO_CHAR (SYSDATE, 'DD_HH24') FROM DUAL;
TTITLE OFF