Skip to content

Commit

Permalink
docs(samples): Add Dataflow snippet for reading from Cloud Storage (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
VeronicaWasson authored Oct 29, 2024
1 parent c6ae893 commit 19dcf09
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

// [START dataflow_read_from_cloud_storage]
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
// [END dataflow_read_from_cloud_storage]
public interface Options extends PipelineOptions {
@Description("The Cloud Storage bucket to read from")
String getBucket();

void setBucket(String value);
}

public static PipelineResult.State main(String[] args) {
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = createPipeline(options);
return pipeline.run().waitUntilFinish();
}

// [START dataflow_read_from_cloud_storage]
public static Pipeline createPipeline(Options options) {
var pipeline = Pipeline.create(options);
pipeline
// Read from a text file.
.apply(TextIO.read().from(
"gs://" + options.getBucket() + "/*.txt"))
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(x -> {
System.out.println(x);
return x;
})));
return pipeline;
}
}
// [END dataflow_read_from_cloud_storage]
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

public class ApacheIcebergIT {
private ByteArrayOutputStream bout;
private PrintStream out;
private final PrintStream originalOut = System.out;

private static final String CATALOG_NAME = "local";
private static final String TABLE_NAME = "table1";
Expand Down Expand Up @@ -112,8 +112,7 @@ private void writeTableRecord()
@Before
public void setUp() throws IOException {
bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
System.setOut(out);
System.setOut(new PrintStream(bout));

// Create an Apache Iceberg catalog with a table.
warehouseDirectory = Files.createTempDirectory("test-warehouse");
Expand All @@ -131,7 +130,7 @@ public void setUp() throws IOException {
@After
public void tearDown() throws IOException {
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
System.setOut(null);
System.setOut(originalOut);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class BigQueryWriteIT {
private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");

private ByteArrayOutputStream bout;
private PrintStream out;
private final PrintStream originalOut = System.out;
private BigQuery bigquery;
private String datasetName;
private String tableName;
Expand All @@ -65,8 +65,7 @@ private void createTable() {
@Before
public void setUp() throws InterruptedException {
bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
System.setOut(out);
System.setOut(new PrintStream(bout));

bigquery = BigQueryOptions.getDefaultInstance().getService();

Expand All @@ -79,7 +78,7 @@ public void setUp() throws InterruptedException {
public void tearDown() {
bigquery.delete(
DatasetId.of(projectId, datasetName), DatasetDeleteOption.deleteContents());
System.setOut(null);
System.setOut(originalOut);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,15 @@ public class BiqQueryReadIT {
private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");

private ByteArrayOutputStream bout;
private PrintStream out;
private final PrintStream originalOut = System.out;
private BigQuery bigquery;
private String datasetName;
private String tableName;

@Before
public void setUp() throws InterruptedException {
bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
System.setOut(out);
System.setOut(new PrintStream(bout));

bigquery = BigQueryOptions.getDefaultInstance().getService();

Expand All @@ -81,7 +80,7 @@ public void setUp() throws InterruptedException {
public void tearDown() {
bigquery.delete(
DatasetId.of(projectId, datasetName), DatasetDeleteOption.deleteContents());
System.setOut(null);
System.setOut(originalOut);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class PubSubWriteIT {
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");

private ByteArrayOutputStream bout;
private PrintStream out;
private final PrintStream originalOut = System.out;
private String topicId;
private String subscriptionId;
TopicAdminClient topicAdminClient;
Expand All @@ -64,8 +64,7 @@ public void setUp() throws Exception {
requireEnvVar("GOOGLE_CLOUD_PROJECT");

bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
System.setOut(out);
System.setOut(new PrintStream(bout));

topicId = "test_topic_" + UUID.randomUUID().toString().substring(0, 8);
subscriptionId = topicId + "-sub";
Expand All @@ -84,7 +83,7 @@ public void setUp() throws Exception {
public void tearDown() {
subscriptionAdminClient.deleteSubscription(SubscriptionName.of(PROJECT_ID, subscriptionId));
topicAdminClient.deleteTopic(TopicName.of(PROJECT_ID, topicId));
System.setOut(null);
System.setOut(originalOut);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.testing.RemoteStorageHelper;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.PipelineResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ReadFromStorageIT {

private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");

private ByteArrayOutputStream bout;
private final PrintStream originalout = System.out;

String bucketName;
Storage storage;

private static final String[] lines = {"line 1", "line 2"};

@Before
public void setUp() {
// Redirect System.err to capture logs.
bout = new ByteArrayOutputStream();
System.setOut(new PrintStream(bout));

// Create a Cloud Storage bucket with a text file.
RemoteStorageHelper helper = RemoteStorageHelper.create();
storage = helper.getOptions().getService();
bucketName = RemoteStorageHelper.generateBucketName();
storage.create(BucketInfo.of(bucketName));

String objectName = "file1.txt";
String contents = String.format("%s\n%s\n", lines[0], lines[1]);

BlobId blobId = BlobId.of(bucketName, objectName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
byte[] content = contents.getBytes(StandardCharsets.UTF_8);

storage.create(blobInfo, content);
}

@After
public void tearDown() throws ExecutionException, InterruptedException {
RemoteStorageHelper.forceDelete(storage, bucketName, 5, TimeUnit.SECONDS);

System.setOut(originalout);
bout.reset();
}

@Test
public void readFromStorage_shouldReadFile() throws Exception {

PipelineResult.State state = ReadFromStorage.main(
new String[] {"--runner=DirectRunner", "--bucket=" + bucketName});
assertEquals(PipelineResult.State.DONE, state);

String got = bout.toString();
assertTrue(got.contains(lines[0]));
assertTrue(got.contains(lines[1]));
}
}

0 comments on commit 19dcf09

Please sign in to comment.