Tap PostgreSQL
The Singer tap is at pipelinewise-tap-postgres
PostgreSQL setup requirements
Step 1: Check if you have all the required credentials for replicating data from PostgreSQL
CREATEROLE
orSUPERUSER
privilege - Either permission is required to create a database user for PipelineWise.
Step 2. Create a PipelineWise database user
Next, you’ll create a dedicated database user for PipelineWise. Create a new user and grant the required permissions on the database, schema and tables that you want to replicate:
CREATE USER pipelinewise WITH ENCRYPTED PASSWORD '<password>'
GRANT CONNECT ON DATABASE <database_name> TO pipelinewise
GRANT USAGE ON SCHEMA <schema_name> TO pipelinewise
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO pipelinewise
In order for pipelinewise user to automatically be able to access any tables created in the future, we recommend running the following query:
ALTER DEFAULT PRIVILEGES IN SCHEMA <schema_name> GRANT SELECT ON TABLES TO pipelinewise;
Step 3: Configure Log-based Incremental Replication
Note
This step is only required if you use Log Based replication method.
Warning
Log Based for PostgreSQL-based databases requires:
PostgreSQL databases running PostgreSQL versions 9.4.x or greater.
To avoid a critical PostgreSQL bug, use at least one of the following minor versions
PostgreSQL 12.0
PostgreSQL 11.2
PostgreSQL 10.7
PostgreSQL 9.6.12
PostgreSQL 9.5.16
PostgreSQL 9.4.21
A connection to the master instance. Log-based replication will only work by connecting to the master instance.
Step 3.1: Install the wal2json plugin
To use Log Based for your PostgreSQL integration, you must install the wal2json plugin that has support for format-version=2 (wal2json >= 2.3). The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based Replication.
Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type are in the wal2json’s GitHub repository:
After you’ve installed the plugin, you can move onto the next step.
Step 3.2: Edit the database configuration file
Locate the database configuration file (usually postgresql.conf
) and define the parameters as follows:
wal_level=logical
max_replication_slots=5
max_wal_senders=5
Note: For max_replication_slots
and max_wal_senders
, we’re defaulting to a value of 5.
This should be sufficient unless you have a large number of read replicas connected to the master instance.
Step 3.3: Restart the PostgreSQL service
Restart your PostgreSQL service to ensure the changes take effect.
Step 3.4: Replication slot
In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database.
Pipelinewise automatically creates a dedicated logical replication slot for each database and tap.
Note
wal2json
is required to use Log Based in Pipelinewise for PostgreSQL-backed databases.
Note
In case of full resync of a whole tap, Pipelinewise will attempt to drop the slot.
Configuring what to replicate
PipelineWise configures every tap with a common structured YAML file format. A sample YAML for Postgres replication can be generated into a project directory by following the steps in the Generating Sample Pipelines section.
Example YAML for tap-postgres
:
---
# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "postgres_sample" # Unique identifier of the tap
name: "Sample Postgres Database" # Name of the tap
type: "tap-postgres" # !! THIS SHOULD NOT CHANGE !!
owner: "somebody@foo.com" # Data owner to contact
#send_alert: False # Optional: Disable all configured alerts on this tap
#slack_alert_channel: "#tap-channel" # Optional: Sending a copy of specific tap alerts to this slack channel
# ------------------------------------------------------------------------------
# Source (Tap) - PostgreSQL connection details
# ------------------------------------------------------------------------------
db_conn:
host: "<HOST>" # PostgreSQL host
port: 5432 # PostgreSQL port
user: "<USER>" # PostfreSQL user
password: "<PASSWORD>" # Plain string or vault encrypted
dbname: "<DB_NAME>" # PostgreSQL database name
#filter_schemas: "schema1,schema2" # Optional: Scan only the required schemas
# to improve the performance of
# data extraction
#max_run_seconds # Optional: Stop running the tap after certain
# number of seconds
# Default: 43200
#logical_poll_total_seconds: # Optional: Stop running the tap when no data
# received from wal after certain number of seconds
# Default: 10800
#break_at_end_lsn: # Optional: Stop running the tap if the newly received lsn
# is after the max lsn that was detected when the tap started
# Default: true
#ssl: "true" # Optional: Using SSL via postgres sslmode 'require' option.
# If the server does not accept SSL connections or the client
# certificate is not recognized the connection will fail
fastsync_parallelism: <int> # Optional: size of multiprocessing pool used by FastSync
# Min: 1
# Default: number of CPU cores
#limit: 50000 # Optional: limit to add to incremental queries, this is useful to avoid long running transactions on the DB
# ------------------------------------------------------------------------------
# Destination (Target) - Target properties
# Connection details should be in the relevant target YAML file
# ------------------------------------------------------------------------------
target: "snowflake" # ID of the target connector where the data will be loaded
batch_size_rows: 20000 # Batch size for the stream to optimise load performance
stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes
#batch_wait_limit_seconds: 3600 # Optional: Maximum time to wait for `batch_size_rows`. Available only for snowflake target.
# Options only for Snowflake target
#split_large_files: False # Optional: split large files to multiple pieces and create multipart zip files. (Default: False)
#split_file_chunk_size_mb: 1000 # Optional: File chunk sizes if `split_large_files` enabled. (Default: 1000)
#split_file_max_chunks: 20 # Optional: Max number of chunks if `split_large_files` enabled. (Default: 20)
#archive_load_files: False # Optional: when enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket`
#archive_load_files_s3_prefix: "archive" # Optional: When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix.
#archive_load_files_s3_bucket: "<BUCKET_NAME>" # Optional: When `archive_load_files` is enabled, the archived files will be placed in this bucket. (Default: the value of `s3_bucket` in target snowflake YAML)
# ------------------------------------------------------------------------------
# Source to target Schema mapping
# ------------------------------------------------------------------------------
schemas:
- source_schema: "public" # Source schema in postgres with tables
target_schema: "repl_pg_public" # Target schema in the destination Data Warehouse
target_schema_select_permissions: # Optional: Grant SELECT on schema and tables that created
- grp_stats
# List of tables to replicate from Postgres to destination Data Warehouse
#
# Please check the Replication Strategies section in the documentation to understand the differences.
# For LOG_BASED replication method you might need to adjust the source mysql/ mariadb configuration.
tables:
- table_name: "table_one"
replication_method: "INCREMENTAL" # One of INCREMENTAL, LOG_BASED and FULL_TABLE
replication_key: "last_update" # Important: Incremental load always needs replication key
# OPTIONAL: Load time transformations
#transformations:
# - column: "last_name" # Column to transform
# type: "SET-NULL" # Transformation type
# You can add as many tables as you need...
- table_name: "table_two"
replication_method: "LOG_BASED" # Important! Log based must be enabled in MySQL
- table_name: "table_three"
replication_method: "LOG_BASED"
sync_start_from: # Optional, applies for then first sync and fast sync
column: "column_name" # column name to be picked for partial sync with inremental or timestamp value
static_value: "start_value" # A static value which the first sync always starts from column >= static_value
drop_target_table: true # Optional, drops target table before syncing. default value is false
- table_name: "table_four"
replication_method: "LOG_BASED"
sync_start_from: # Optional, applies for then first sync and fast sync
column: "column_name" # Column name to be picked for partial sync with incremental or timestamp value
dynamic_value: "A SELECT query # It can be a valid PG SELECT query which returns only one row with one column and first sync always starts from column >= dynamic_value
drop_target_table: true # Optional, drops target table before syncing. default value is false
# You can add as many schemas as you need...
# Uncomment this if you want replicate tables from multiple schemas
#- source_schema: "another_schema_in_postgres"
# target_schema: "another
# static and dynamic values can not be defined together for a table and only one of them can be used.