@@ -153,6 +153,10 @@ public static Bound named(String name) {
153153 * <li>Must end with a letter or a number.</li>
154154 * <li>Cannot begin with 'goog' prefix.</li>
155155 * </ul>
156+ *
157+ * Dataflow will start reading data published on this topic from the time the pipeline is
158+ * started. Any data published on the topic before the pipeline is started will not be read
159+ * by Dataflow.
156160 */
157161 public static Bound topic (String topic ) {
158162 return new Bound ().topic (topic );
@@ -180,6 +184,44 @@ public static Bound subscription(String subscription) {
180184 return new Bound ().subscription (subscription );
181185 }
182186
187+ /**
188+ * Creates and returns a PubsubIO.Read PTransform where record timestamps are expected
189+ * to be provided using the PubSub labeling API. The {@code <timestampLabel>} parameter
190+ * specifies the label name. The label value sent to PubsSub is a numerical value representing
191+ * the number of milliseconds since the Unix epoch. For example, if using the joda time classes,
192+ * org.joda.time.Instant.getMillis() returns the correct value for this label.
193+ *
194+ * <p> If {@code <timestampLabel>} is not provided, the system will generate record timestamps
195+ * the first time it sees each record. All windowing will be done relative to these timestamps.
196+ * Windows are closed based on an estimate of when this source has finished producing data for
197+ * a timestamp range, which means that late data can arrive after a window has been closed. The
198+ * {#dropLateData} field allows you to control what to do with late data.
199+ */
200+ public static Bound timestampLabel (String timestampLabel ) {
201+ return new Bound ().timestampLabel (timestampLabel );
202+ }
203+
204+ /**
205+ * If true, then late-arriving data from this source will be dropped.
206+ */
207+ public static Bound dropLateData (boolean dropLateData ) {
208+ return new Bound ().dropLateData (dropLateData );
209+ }
210+
211+ /**
212+ * Creates and returns a PubSubIO.Read PTransform where unique record identifiers are
213+ * expected to be provided using the PubSub labeling API. The {@code <idLabel>} parameter
214+ * specifies the label name. The label value sent to PubSub can be any string value that
215+ * uniquely identifies this record.
216+ *
217+ * <p> If idLabel is not provided, Dataflow cannot guarantee that no duplicate data will be
218+ * delivered on the PubSub stream. In this case, deduplication of the stream will be
219+ * stricly best effort.
220+ */
221+ public static Bound idLabel (String idLabel ) {
222+ return new Bound ().idLabel (idLabel );
223+ }
224+
183225 /**
184226 * A PTransform that reads from a PubSub source and returns
185227 * a unbounded PCollection containing the items from the stream.
@@ -191,10 +233,22 @@ public static class Bound
191233 String topic ;
192234 /** The Pubsub subscription to read from. */
193235 String subscription ;
236+ /** The Pubsub label to read timestamps from. */
237+ String timestampLabel ;
238+ Boolean dropLateData ;
239+ /** This is set for backwards compatibility with old services. If dropLateData is not
240+ * explicitly called, then we won't forward that parameter to the service. */
241+ Boolean dropLateDataExplicit ;
242+ /** The Pubsub label to read ids from. */
243+ String idLabel ;
244+
245+ Bound () {
246+ this .dropLateData = true ;
247+ this .dropLateDataExplicit = false ;
248+ }
194249
195- Bound () {}
196-
197- Bound (String name , String subscription , String topic ) {
250+ Bound (String name , String subscription , String topic , String timestampLabel ,
251+ boolean dropLateData , boolean dropLateDataExplicit , String idLabel ) {
198252 super (name );
199253 if (subscription != null ) {
200254 Validator .validateSubscriptionName (subscription );
@@ -204,18 +258,63 @@ public static class Bound
204258 }
205259 this .subscription = subscription ;
206260 this .topic = topic ;
261+ this .timestampLabel = timestampLabel ;
262+ this .dropLateData = dropLateData ;
263+ this .dropLateDataExplicit = dropLateDataExplicit ;
264+ this .idLabel = idLabel ;
207265 }
208266
267+ /**
268+ * Returns a new TextIO.Read PTransform that's like this one but with the given
269+ * step name. Does not modify the object.
270+ */
209271 public Bound named (String name ) {
210- return new Bound (name , subscription , topic );
272+ return new Bound (name , subscription , topic , timestampLabel , dropLateData ,
273+ dropLateDataExplicit , idLabel );
211274 }
212275
276+ /**
277+ * Returns a new TextIO.Read PTransform that's like this one but reading from the
278+ * given subscription. Does not modify the object.
279+ */
213280 public Bound subscription (String subscription ) {
214- return new Bound (name , subscription , topic );
281+ return new Bound (name , subscription , topic , timestampLabel , dropLateData ,
282+ dropLateDataExplicit , idLabel );
215283 }
216284
285+ /**
286+ * Returns a new TextIO.Read PTransform that's like this one but reading from the
287+ * give topic. Does not modify the object.
288+ */
217289 public Bound topic (String topic ) {
218- return new Bound (name , subscription , topic );
290+ return new Bound (name , subscription , topic , timestampLabel , dropLateData ,
291+ dropLateDataExplicit , idLabel );
292+ }
293+
294+ /**
295+ * Returns a new TextIO.Read PTransform that's like this one but reading timestamps
296+ * from the given PubSub label. Does not modify the object.
297+ */
298+ public Bound timestampLabel (String timestampLabel ) {
299+ return new Bound (name , subscription , topic , timestampLabel , dropLateData ,
300+ dropLateDataExplicit , idLabel );
301+ }
302+
303+ /**
304+ * Returns a new TextIO.Read PTransform that's like this one but with the specified
305+ * setting for dropLateData. Does not modify the object.
306+ */
307+ public Bound dropLateData (boolean dropLateData ) {
308+ return new Bound (name , subscription , topic , timestampLabel , dropLateData , true , idLabel );
309+ }
310+
311+ /**
312+ * Returns a new TextIO.Read PTransform that's like this one but reading unique ids
313+ * from the given PubSub label. Does not modify the object.
314+ */
315+ public Bound idLabel (String idLabel ) {
316+ return new Bound (name , subscription , topic , timestampLabel , dropLateData ,
317+ dropLateDataExplicit , idLabel );
219318 }
220319
221320 @ Override
@@ -250,6 +349,22 @@ public String getSubscription() {
250349 return subscription ;
251350 }
252351
352+ public String getTimestampLabel () {
353+ return timestampLabel ;
354+ }
355+
356+ public boolean getDropLateData () {
357+ return dropLateData ;
358+ }
359+
360+ public boolean getDropLateDataExplicit () {
361+ return dropLateDataExplicit ;
362+ }
363+
364+ public String getIdLabel () {
365+ return idLabel ;
366+ }
367+
253368 static {
254369 // TODO: Figure out how to make this work under
255370 // DirectPipelineRunner.
@@ -278,6 +393,30 @@ public static Bound topic(String topic) {
278393 return new Bound ().topic (topic );
279394 }
280395
396+ /**
397+ * If specified, Dataflow will add a Pubsub label to each output record specifying the logical
398+ * timestamp of the record. {@code <timestampLabel>} determines the label name. The label value
399+ * is a numerical value representing the number of milliseconds since the Unix epoch. For
400+ * example, if using the joda time classes, the org.joda.time.Instant(long) constructor can be
401+ * used to parse this value. If the output from this sink is being read by another Dataflow
402+ * source, then PubsubIO.Read.timestampLabel can be used to ensure that the other source reads
403+ * these timestamps from the appropriate label.
404+ */
405+ public static Bound timestampLabel (String timestampLabel ) {
406+ return new Bound ().timestampLabel (timestampLabel );
407+ }
408+
409+ /**
410+ * If specified, Dataflow will add a Pubsub label to each output record containing a unique
411+ * identifier for that record. {@code <idLabel>} determines the label name. The label value
412+ * is an opaque string value. This is useful if the the output from this sink is being read
413+ * by another Dataflow source, in which case PubsubIO.Read.idLabel can be used to ensure that
414+ * the other source reads these ids from the appropriate label.
415+ */
416+ public static Bound idLabel (String idLabel ) {
417+ return new Bound ().idLabel (idLabel );
418+ }
419+
281420 /**
282421 * A PTransfrom that writes a unbounded {@code PCollection<String>}
283422 * to a PubSub stream.
@@ -287,23 +426,51 @@ public static class Bound
287426 extends PTransform <PCollection <String >, PDone > {
288427 /** The Pubsub topic to publish to. */
289428 String topic ;
429+ String timestampLabel ;
430+ String idLabel ;
290431
291432 Bound () {}
292433
293- Bound (String name , String topic ) {
434+ Bound (String name , String topic , String timestampLabel , String idLabel ) {
294435 super (name );
295436 if (topic != null ) {
296437 Validator .validateTopicName (topic );
297438 this .topic = topic ;
298439 }
440+ this .timestampLabel = timestampLabel ;
441+ this .idLabel = idLabel ;
299442 }
300443
444+ /**
445+ * Returns a new TextIO.Write PTransform that's like this one but with the given step
446+ * name. Does not modify the object.
447+ */
301448 public Bound named (String name ) {
302- return new Bound (name , topic );
449+ return new Bound (name , topic , timestampLabel , idLabel );
303450 }
304451
452+ /**
453+ * Returns a new TextIO.Write PTransform that's like this one but writing to the given
454+ * topic. Does not modify the object.
455+ */
305456 public Bound topic (String topic ) {
306- return new Bound (name , topic );
457+ return new Bound (name , topic , timestampLabel , idLabel );
458+ }
459+
460+ /**
461+ * Returns a new TextIO.Write PTransform that's like this one but publishing timestamps
462+ * to the given PubSub label. Does not modify the object.
463+ */
464+ public Bound timestampLabel (String timestampLabel ) {
465+ return new Bound (name , topic , timestampLabel , idLabel );
466+ }
467+
468+ /**
469+ * Returns a new TextIO.Write PTransform that's like this one but publishing record ids
470+ * to the given PubSub label. Does not modify the object.
471+ */
472+ public Bound idLabel (String idLabel ) {
473+ return new Bound (name , topic , timestampLabel , idLabel );
307474 }
308475
309476 @ Override
@@ -327,6 +494,14 @@ public String getTopic() {
327494 return topic ;
328495 }
329496
497+ public String getTimestampLabel () {
498+ return timestampLabel ;
499+ }
500+
501+ public String getIdLabel () {
502+ return idLabel ;
503+ }
504+
330505 static {
331506 // TODO: Figure out how to make this work under
332507 // DirectPipelineRunner.
0 commit comments