Tap MongoDB

MongoDB setup requirements

Step 1: Check if you have all the required credentials for replicating data from MongoDB

  • The user must have one of the following roles: read, readWrite, readAnyDatabase, readWriteAnyDatabase, dbOwner, backup, root. These roles allow PipelineWise to see and read from the dbs to sync from.

  • If privileges are set, the user must have at least these two actions: find and changeStream. These actions are necessary because they’re the actions that PipelineWise performs while syncing.

Step 2: Required database server settings

Note

This step is only required if you use Log Based replication method.

Warning

To use log_based replication, your MongoDB server must be running MongoDB version 3.6 or greater, is either a replica set or sharded cluster and majority read concern is enabled.

The log_based replication makes use of ChangeStreams that were introduced in version 3.6, for more info on ChangeStreams, head over to the official documentation.

Step 3. Create a PipelineWise database user

Next, you’ll create a dedicated user for PipelineWise. The user needs to have:

  • One of the roles read, readWrite, readAnyDatabase, readWriteAnyDatabase, dbOwner, backup, root on the database that you want to replicate

  • find & changeStream privileges on the every collection that you want to replicate.

Example:

db.createRole({

        "role" : "PipelineWiseRole",
        "privileges" : [{

                "resource" :{

                        "db" : "my_db",
                        "collection" : "my_collection"

                },
                "actions" : ["find", "changeStream"]

        }],
        "roles" : [{"role": "read", "db": "my_db"}]

});

db.createUser({

        "user" : "PipelineWiseUser",
        "pwd": "mY_VerY_StRonG_PaSSwoRd",
        "roles" : ["PipelineWiseRole"]

});

Configuring what to replicate

PipelineWise configures every tap with a common structured YAML file format. A sample YAML for MongoDB replication can be generated into a project directory by following the steps in the Generating Sample Pipelines section.

Example YAML for tap-mongodb:

    ---

    # ------------------------------------------------------------------------------
    # General Properties
    # ------------------------------------------------------------------------------
    id: "tap_mongo"
    name: "MongoDB tap"
    type: "tap-mongodb"
    owner: "foo@bar.com"
    #send_alert: False                     # Optional: Disable all configured alerts on this tap
#slack_alert_channel: "#tap-channel"   # Optional: Sending a copy of specific tap alerts to this slack channel


    # ------------------------------------------------------------------------------
    # Source (Tap) - Mongo connection details
    # ------------------------------------------------------------------------------
    db_conn:
            host: "mongodb_host1,mongodb_host2,mongodb_host3"       # Mongodb host(s)
            port: 27017                                                     # Mongodb port
            srv: "false"                                                                            # For MongoDB Atlas `srv` should be "true" and `port` will be ignored
            user: "PipelineWiseUser"                                        # Mongodb user
            password: "mY_VerY_StRonG_PaSSwoRd"                 # Mongodb plain string or vault encrypted
            auth_database: "admin"                                                  # Mongodb database to authenticate on
            dbname: "my_db"                                                                 # Mongodb database name to sync from
            replica_set: "my_replica_set"                                   # Optional, Mongodb replica set name, default null
            write_batch_rows: <int>                                                         # Optional: Number of rows to write to csv file
                                                                            #           in one batch. Default is 50000.
    update_buffer_size: <int>                                                   # Optional: [LOG_BASED] The size of the buffer that holds detected update
                                                        #           operations in memory, the buffer is flushed once the size is reached. Default is 1.
    await_time_ms: <int>                                                            # Optional: [LOG_BASED] The maximum amount of time in milliseconds
                                                        #           the loge_base method waits for new data changes before exiting. Default is 1000 ms.
    fastsync_parallelism: <int>                         # Optional: size of multiprocessing pool used by FastSync
                                                        #           Min: 1
                                                        #           Default: number of CPU cores
    # ------------------------------------------------------------------------------
    # Destination (Target) - Target properties
    # Connection details should be in the relevant target YAML file
    # ------------------------------------------------------------------------------
    target: "my_target"                                     # ID of the target connector where the data will be loaded
    batch_size_rows: 1000                                   # Batch size for the stream to optimise load performance
    stream_buffer_size: 0                           # In-memory buffer size (MB) between taps and targets for asynchronous data pipes
    #batch_wait_limit_seconds: 3600                 # Optional: Maximum time to wait for `batch_size_rows`. Available only for snowflake target.

# Options only for Snowflake target
#split_large_files: False                       # Optional: split large files to multiple pieces and create multipart zip files. (Default: False)
#split_file_chunk_size_mb: 1000                 # Optional: File chunk sizes if `split_large_files` enabled. (Default: 1000)
#split_file_max_chunks: 20                      # Optional: Max number of chunks if `split_large_files` enabled. (Default: 20)
#archive_load_files: False                      # Optional: when enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket`
#archive_load_files_s3_prefix: "archive"        # Optional: When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix.
#archive_load_files_s3_bucket: "<BUCKET_NAME>"  # Optional: When `archive_load_files` is enabled, the archived files will be placed in this bucket. (Default: the value of `s3_bucket` in target snowflake YAML)


    # ------------------------------------------------------------------------------
    # Source to target Schema mapping
    # ------------------------------------------------------------------------------
    schemas:
            - source_schema: "my_db"                                        # Same name as dbname
              target_schema: "ppw_e2e_tap_mongodb"          # Name of target schema to load to

              # List of collections to sync
              tables:
                    - table_name: "my_collection"
                      replication_method: "FULL_TABLE"

                    # default replication method is LOG_BASED
                    - table_name: "my_other_collection"

Example connection to MongoDB Atlas

db_conn:
        srv: "true"
        host: "xxxxxxxxx.xxxxx.mongodb.net"
        auth_database: "admin"                  # the Mongodb database name to authenticate on
        dbname: "db-name"                               # Mongodb database name to sync from
        user: "user-name"                               # User with read roles
        password: "password"                    # Plain string or vault encrypted