Class RdePipeline

All Implemented Interfaces:

@Singleton public class RdePipeline extends Object implements Serializable
Definition of a Dataflow Flex template, which generates RDE/BRDA deposits.

To stage this template locally, run ./nom_build :core:sBP --environment=alpha --pipeline=rde.

Then, you can run the staged template via the API client library, gCloud or a raw REST call.

This pipeline only works for pending deposits with the same watermark, the RdeStagingAction will batch such pending deposits together and launch multiple pipelines if multiple watermarks exist.

The pipeline is broadly divided into two parts -- creating the DepositFragments, and processing them.

Creating DepositFragment


Non-test registrar entities are loaded from Cloud SQL and marshalled into deposit fragments. They are NOT rewound to the watermark.


All EPP resources are loaded from the corresponding HistoryEntry, which has the resource embedded. In general, we find most recent history entry before watermark and filter out the ones that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history revision ID) from the SQL query.


After the most recent (live) domain resources are loaded from the corresponding history objects, we marshall them to deposit fragments and emit the (pending deposit: deposit fragment) pairs for further processing. We also find all the contacts and hosts referenced by a given domain and emit pairs of (contact/host repo ID: pending deposit) for all RDE pending deposits for further processing.


We first join most recent contact histories, represented by (contact repo ID: contact history revision ID) pairs, with referenced contacts, represented by (contact repo ID: pending deposit) pairs, on the contact repo ID, to remove unreferenced contact histories. Contact resources are then loaded from the remaining referenced contact histories, and marshalled into (pending deposit: deposit fragment) pairs.


Similar to Contact, we join the most recent host history with referenced hosts to find most recent referenced hosts. For external hosts we do the same treatment as we did on contacts and obtain the (pending deposit: deposit fragment) pairs. For subordinate hosts, we need to find the superordinate domain in order to properly handle pending transfer in the deposit as well. So we first find the superordinate domain repo ID from the host and join the (superordinate domain repo ID: (subordinate host repo ID: (pending deposit: revision ID))) pair with the (domain repo ID: revision ID) pair obtained from the domain history query in order to map the host at watermark to the domain at watermark. We then proceed to create the (pending deposit: deposit fragment) pair for subordinate hosts using the added domain information.

Processing DepositFragment

The (pending deposit: deposit fragment) pairs from different resources are combined and grouped by pending deposit. For each pending deposit, all the relevant deposit fragments are written into an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there is no need to lock the GCS write operation to prevent stomping. The cursor for staging the pending deposit is then rolled forward, and the next action is enqueued. The latter two operations are performed in a transaction so the cursor is rolled back if enqueueing failed.
See Also: