.. _tap-postgres: Tap PostgreSQL -------------- The Singer tap is at `pipelinewise-tap-postgres `_ PostgreSQL setup requirements ''''''''''''''''''''''''''''' **Step 1: Check if you have all the required credentials for replicating data from PostgreSQL** * ``CREATEROLE`` or ``SUPERUSER`` privilege - Either permission is required to create a database user for PipelineWise. **Step 2. Create a PipelineWise database user** Next, you’ll create a dedicated database user for PipelineWise. Create a new user and grant the required permissions on the database, schema and tables that you want to replicate: * ``CREATE USER pipelinewise WITH ENCRYPTED PASSWORD ''`` * ``GRANT CONNECT ON DATABASE TO pipelinewise`` * ``GRANT USAGE ON SCHEMA TO pipelinewise`` * ``GRANT SELECT ON ALL TABLES IN SCHEMA TO pipelinewise`` In order for pipelinewise user to automatically be able to access any tables created in the future, we recommend running the following query: ``ALTER DEFAULT PRIVILEGES IN SCHEMA GRANT SELECT ON TABLES TO pipelinewise;`` **Step 3: Configure Log-based Incremental Replication** .. note:: This step is only required if you use :ref:`log_based` replication method. .. warning:: :ref:`log_based` for PostgreSQL-based databases requires: * **PostgreSQL databases running PostgreSQL versions 9.4.x or greater.** * **To avoid a critical PostgreSQL bug, use at least one of the following minor versions** * PostgreSQL 12.0 * PostgreSQL 11.2 * PostgreSQL 10.7 * PostgreSQL 9.6.12 * PostgreSQL 9.5.16 * PostgreSQL 9.4.21 * **A connection to the master instance.** Log-based replication will only work by connecting to the master instance. **Step 3.1: Install the wal2json plugin** To use :ref:`log_based` for your PostgreSQL integration, you must install the `wal2json `_ plugin that has support for format-version=2 (wal2json >= 2.3). The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based Replication. Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type are in the wal2json’s GitHub repository: * `Unix-based operating systems `_ * `Windows `_ After you’ve installed the plugin, you can move onto the next step. **Step 3.2: Edit the database configuration file** Locate the database configuration file (usually ``postgresql.conf``) and define the parameters as follows: .. code-block:: bash wal_level=logical max_replication_slots=5 max_wal_senders=5 **Note**: For ``max_replication_slots`` and ``max_wal_senders``, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance. **Step 3.3: Restart the PostgreSQL service** Restart your PostgreSQL service to ensure the changes take effect. **Step 3.4: Replication slot** In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database. Pipelinewise automatically creates a dedicated logical replication slot for each database and tap. .. note:: ``wal2json`` is required to use :ref:`log_based` in Pipelinewise for PostgreSQL-backed databases. .. note:: In case of full resync of a whole tap, Pipelinewise will attempt to drop the slot. Configuring what to replicate ''''''''''''''''''''''''''''' PipelineWise configures every tap with a common structured YAML file format. A sample YAML for Postgres replication can be generated into a project directory by following the steps in the :ref:`generating_pipelines` section. Example YAML for ``tap-postgres``: .. code-block:: yaml --- # ------------------------------------------------------------------------------ # General Properties # ------------------------------------------------------------------------------ id: "postgres_sample" # Unique identifier of the tap name: "Sample Postgres Database" # Name of the tap type: "tap-postgres" # !! THIS SHOULD NOT CHANGE !! owner: "somebody@foo.com" # Data owner to contact #send_alert: False # Optional: Disable all configured alerts on this tap #slack_alert_channel: "#tap-channel" # Optional: Sending a copy of specific tap alerts to this slack channel # ------------------------------------------------------------------------------ # Source (Tap) - PostgreSQL connection details # ------------------------------------------------------------------------------ db_conn: host: "" # PostgreSQL host port: 5432 # PostgreSQL port user: "" # PostfreSQL user password: "" # Plain string or vault encrypted dbname: "" # PostgreSQL database name #filter_schemas: "schema1,schema2" # Optional: Scan only the required schemas # to improve the performance of # data extraction #max_run_seconds # Optional: Stop running the tap after certain # number of seconds # Default: 43200 #logical_poll_total_seconds: # Optional: Stop running the tap when no data # received from wal after certain number of seconds # Default: 10800 #break_at_end_lsn: # Optional: Stop running the tap if the newly received lsn # is after the max lsn that was detected when the tap started # Default: true #ssl: "true" # Optional: Using SSL via postgres sslmode 'require' option. # If the server does not accept SSL connections or the client # certificate is not recognized the connection will fail fastsync_parallelism: # Optional: size of multiprocessing pool used by FastSync # Min: 1 # Default: number of CPU cores #limit: 50000 # Optional: limit to add to incremental queries, this is useful to avoid long running transactions on the DB # ------------------------------------------------------------------------------ # Destination (Target) - Target properties # Connection details should be in the relevant target YAML file # ------------------------------------------------------------------------------ target: "snowflake" # ID of the target connector where the data will be loaded batch_size_rows: 20000 # Batch size for the stream to optimise load performance stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes #batch_wait_limit_seconds: 3600 # Optional: Maximum time to wait for `batch_size_rows`. Available only for snowflake target. # Options only for Snowflake target #split_large_files: False # Optional: split large files to multiple pieces and create multipart zip files. (Default: False) #split_file_chunk_size_mb: 1000 # Optional: File chunk sizes if `split_large_files` enabled. (Default: 1000) #split_file_max_chunks: 20 # Optional: Max number of chunks if `split_large_files` enabled. (Default: 20) #archive_load_files: False # Optional: when enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket` #archive_load_files_s3_prefix: "archive" # Optional: When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix. #archive_load_files_s3_bucket: "" # Optional: When `archive_load_files` is enabled, the archived files will be placed in this bucket. (Default: the value of `s3_bucket` in target snowflake YAML) # ------------------------------------------------------------------------------ # Source to target Schema mapping # ------------------------------------------------------------------------------ schemas: - source_schema: "public" # Source schema in postgres with tables target_schema: "repl_pg_public" # Target schema in the destination Data Warehouse target_schema_select_permissions: # Optional: Grant SELECT on schema and tables that created - grp_stats # List of tables to replicate from Postgres to destination Data Warehouse # # Please check the Replication Strategies section in the documentation to understand the differences. # For LOG_BASED replication method you might need to adjust the source mysql/ mariadb configuration. tables: - table_name: "table_one" replication_method: "INCREMENTAL" # One of INCREMENTAL, LOG_BASED and FULL_TABLE replication_key: "last_update" # Important: Incremental load always needs replication key # OPTIONAL: Load time transformations #transformations: # - column: "last_name" # Column to transform # type: "SET-NULL" # Transformation type # You can add as many tables as you need... - table_name: "table_two" replication_method: "LOG_BASED" # Important! Log based must be enabled in MySQL - table_name: "table_three" replication_method: "LOG_BASED" sync_start_from: # Optional, applies for then first sync and fast sync column: "column_name" # column name to be picked for partial sync with inremental or timestamp value static_value: "start_value" # A static value which the first sync always starts from column >= static_value drop_target_table: true # Optional, drops target table before syncing. default value is false - table_name: "table_four" replication_method: "LOG_BASED" sync_start_from: # Optional, applies for then first sync and fast sync column: "column_name" # Column name to be picked for partial sync with incremental or timestamp value dynamic_value: "A SELECT query # It can be a valid PG SELECT query which returns only one row with one column and first sync always starts from column >= dynamic_value drop_target_table: true # Optional, drops target table before syncing. default value is false # You can add as many schemas as you need... # Uncomment this if you want replicate tables from multiple schemas #- source_schema: "another_schema_in_postgres" # target_schema: "another # static and dynamic values can not be defined together for a table and only one of them can be used.