Package google.registry.beam.rde
Class RdePipeline
java.lang.Object
google.registry.beam.rde.RdePipeline
- All Implemented Interfaces:
Serializable
Definition of a Dataflow Flex template, which generates RDE/BRDA deposits.
Creating
Non-test registrar entities are loaded from Cloud SQL and marshalled into deposit fragments. They
are NOT rewound to the watermark.
All EPP resources are loaded from the corresponding
After the most recent (live) domain resources are loaded from the corresponding history objects,
we marshall them to deposit fragments and emit the (pending deposit: deposit fragment) pairs for
further processing. We also find all the contacts and hosts referenced by a given domain and emit
pairs of (contact/host repo ID: pending deposit) for all RDE pending deposits for further
processing.
We first join most recent contact histories, represented by (contact repo ID: contact history
revision ID) pairs, with referenced contacts, represented by (contact repo ID: pending deposit)
pairs, on the contact repo ID, to remove unreferenced contact histories. Contact resources are
then loaded from the remaining referenced contact histories, and marshalled into (pending
deposit: deposit fragment) pairs.
Similar to Processing
The (pending deposit: deposit fragment) pairs from different resources are combined and grouped
by pending deposit. For each pending deposit, all the relevant deposit fragments are written into
an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
is no need to lock the GCS write operation to prevent stomping. The cursor for staging the
pending deposit is then rolled forward, and the next action is enqueued. The latter two
operations are performed in a transaction so the cursor is rolled back if enqueueing failed.
To stage this template locally, run ./nom_build :core:sBP --environment=alpha
--pipeline=rde
.
Then, you can run the staged template via the API client library, gCloud or a raw REST call.
This pipeline only works for pending deposits with the same watermark, the RdeStagingAction
will batch such pending deposits together and launch
multiple pipelines if multiple watermarks exist.
The pipeline is broadly divided into two parts -- creating the DepositFragment
s, and
processing them.
Creating DepositFragment
Registrar
Non-test registrar entities are loaded from Cloud SQL and marshalled into deposit fragments. They
are NOT rewound to the watermark.
EppResource
All EPP resources are loaded from the corresponding HistoryEntry
, which has the resource
embedded. In general, we find most recent history entry before watermark and filter out the ones
that are soft-deleted by watermark. The history is emitted as pairs of (resource repo ID: history
revision ID) from the SQL query.
Domain
After the most recent (live) domain resources are loaded from the corresponding history objects,
we marshall them to deposit fragments and emit the (pending deposit: deposit fragment) pairs for
further processing. We also find all the contacts and hosts referenced by a given domain and emit
pairs of (contact/host repo ID: pending deposit) for all RDE pending deposits for further
processing.
Contact
We first join most recent contact histories, represented by (contact repo ID: contact history
revision ID) pairs, with referenced contacts, represented by (contact repo ID: pending deposit)
pairs, on the contact repo ID, to remove unreferenced contact histories. Contact resources are
then loaded from the remaining referenced contact histories, and marshalled into (pending
deposit: deposit fragment) pairs.
Host
Similar to Contact
, we join the most recent host history with referenced hosts to find
most recent referenced hosts. For external hosts we do the same treatment as we did on contacts
and obtain the (pending deposit: deposit fragment) pairs. For subordinate hosts, we need to find
the superordinate domain in order to properly handle pending transfer in the deposit as well. So
we first find the superordinate domain repo ID from the host and join the (superordinate domain
repo ID: (subordinate host repo ID: (pending deposit: revision ID))) pair with the (domain repo
ID: revision ID) pair obtained from the domain history query in order to map the host at
watermark to the domain at watermark. We then proceed to create the (pending deposit: deposit
fragment) pair for subordinate hosts using the added domain information.
Processing DepositFragment
The (pending deposit: deposit fragment) pairs from different resources are combined and grouped
by pending deposit. For each pending deposit, all the relevant deposit fragments are written into
an encrypted file stored on GCS. The filename is uniquely determined by the Beam job ID so there
is no need to lock the GCS write operation to prevent stomping. The cursor for staging the
pending deposit is then rolled forward, and the next action is enqueued. The latter two
operations are performed in a transaction so the cursor is rolled back if enqueueing failed.- See Also:
-
Nested Class Summary
Modifier and TypeClassDescriptionprotected static class
A utility class that containsTupleTag
s whenPCollectionTuple
s andCoGbkResult
s are used. -
Method Summary
Modifier and TypeMethodDescriptionstatic String
encodePendingDeposits
(com.google.common.collect.ImmutableSet<PendingDeposit> pendingDeposits) Encodes the pending deposit set in a URL safe string that is sent to the pipeline worker by the pipeline launcher as a pipeline option.static void
-
Method Details
-
encodePendingDeposits
public static String encodePendingDeposits(com.google.common.collect.ImmutableSet<PendingDeposit> pendingDeposits) throws IOException Encodes the pending deposit set in a URL safe string that is sent to the pipeline worker by the pipeline launcher as a pipeline option.- Throws:
IOException
-
main
- Throws:
IOException
ClassNotFoundException
-