Skip to content

Disk buffering reading refactor #1917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
6 changes: 0 additions & 6 deletions disk-buffering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ The configurable parameters are provided **per exporter**, the available ones ar
* Max age for file reading, defaults to 18 hours. After that time passes, the file will be
considered stale and will be removed when new files are created. No more data will be read from a
file past this time.
* An instance
of [TemporaryFileProvider](src/main/java/io/opentelemetry/contrib/disk/buffering/config/TemporaryFileProvider.java),
defaults to calling `File.createTempFile`. This provider will be used when reading from the disk
in order create a temporary file from which each line (batch of signals) will be read and
sequentially get removed from the original cache file right after the data has been successfully
exported.

## Usage

Expand Down
8 changes: 2 additions & 6 deletions disk-buffering/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ plugins {
description = "Exporter implementations that store signals on disk"
otelJava.moduleName.set("io.opentelemetry.contrib.exporters.disk")

java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

val protos by configurations.creating

dependencies {
Expand Down Expand Up @@ -75,7 +70,8 @@ wire {

tasks.named<ShadowJar>("shadowJar") {
archiveClassifier.set("")
configurations = emptyList() // To avoid embedding any dependencies as we only need to rename some local packages.
configurations =
emptyList() // To avoid embedding any dependencies as we only need to rename some local packages.
relocate("io.opentelemetry.proto", "io.opentelemetry.diskbuffering.proto")
mustRunAfter("jar")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.contrib.disk.buffering.config;

import com.google.auto.value.AutoValue;
import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider;
import java.io.File;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -49,23 +48,18 @@ public abstract class StorageConfiguration {
*/
public abstract int getMaxFolderSize();

/** A creator of temporary files needed to do the disk reading process. */
public abstract TemporaryFileProvider getTemporaryFileProvider();

public static StorageConfiguration getDefault(File rootDir) {
return builder().setRootDir(rootDir).build();
}

public static Builder builder() {
TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance();
return new AutoValue_StorageConfiguration.Builder()
.setMaxFileSize(1024 * 1024) // 1MB
.setMaxFolderSize(10 * 1024 * 1024) // 10MB
.setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30))
.setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33))
.setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18))
.setDebugEnabled(false)
.setTemporaryFileProvider(fileProvider);
.setDebugEnabled(false);
}

@AutoValue.Builder
Expand All @@ -80,8 +74,6 @@ public abstract static class Builder {

public abstract Builder setMaxFolderSize(int value);

public abstract Builder setTemporaryFileProvider(TemporaryFileProvider value);

public abstract Builder setRootDir(File rootDir);

public abstract Builder setDebugEnabled(boolean debugEnabled);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,9 @@
import java.io.File;

public interface FileOperations extends Closeable {
long getSize();

boolean hasExpired();

boolean isClosed();

File getFile();

default String getFileName() {
return getFile().getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileStream;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.sdk.common.Clock;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand All @@ -37,14 +33,11 @@
*/
public final class ReadableFile implements FileOperations {
@NotNull private final File file;
private final int originalFileSize;
private final FileStream fileStream;
private final StreamReader reader;
private final FileTransferUtil fileTransferUtil;
private final File temporaryFile;
private final Clock clock;
private final long expireTimeMillis;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private int readBytes = 0;
@Nullable private ReadResult unconsumedResult;

public ReadableFile(
Expand All @@ -59,7 +52,7 @@ public ReadableFile(
}

public ReadableFile(
File file,
@NotNull File file,
long createdTimeMillis,
Clock clock,
StorageConfiguration configuration,
Expand All @@ -68,12 +61,8 @@ public ReadableFile(
this.file = file;
this.clock = clock;
expireTimeMillis = createdTimeMillis + configuration.getMaxFileAgeForReadMillis();
originalFileSize = (int) file.length();
temporaryFile = configuration.getTemporaryFileProvider().createTemporaryFile(file.getName());
copyFile(file, temporaryFile);
FileInputStream tempInputStream = new FileInputStream(temporaryFile);
fileTransferUtil = new FileTransferUtil(tempInputStream, file);
reader = readerFactory.create(tempInputStream);
fileStream = FileStream.create(file);
reader = readerFactory.create(fileStream);
}

/**
Expand Down Expand Up @@ -101,11 +90,8 @@ public synchronized ReadableResult readAndProcess(Function<byte[], ProcessResult
switch (processing.apply(read.content)) {
case SUCCEEDED:
unconsumedResult = null;
readBytes += read.totalReadLength;
int amountOfBytesToTransfer = originalFileSize - readBytes;
if (amountOfBytesToTransfer > 0) {
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
} else {
fileStream.truncateTop();
if (fileStream.size() == 0) {
cleanUp();
}
return ReadableResult.SUCCEEDED;
Expand All @@ -124,17 +110,7 @@ private ReadResult readNextItem() throws IOException {
if (unconsumedResult != null) {
return unconsumedResult;
}
return reader.read();
}

private void cleanUp() throws IOException {
file.delete();
close();
}

@Override
public long getSize() {
return originalFileSize;
return reader.readNext();
}

@Override
Expand All @@ -153,29 +129,18 @@ public File getFile() {
return file;
}

private void cleanUp() throws IOException {
close();
if (!file.delete()) {
throw new IOException("Could not delete file: " + file);
}
}

@Override
public synchronized void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
unconsumedResult = null;
fileTransferUtil.close();
reader.close();
temporaryFile.delete();
}
}

/**
* This is needed instead of using Files.copy in order to keep it compatible with Android api <
* 26.
*/
private static void copyFile(File from, File to) throws IOException {
try (InputStream in = new FileInputStream(from);
OutputStream out = new FileOutputStream(to)) {

byte[] buffer = new byte[1024];
int lengthRead;
while ((lengthRead = in.read(buffer)) > 0) {
out.write(buffer, 0, lengthRead);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public synchronized WritableResult append(byte[] data) throws IOException {
return WritableResult.SUCCEEDED;
}

@Override
public synchronized long getSize() {
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@

package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader;

import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.CountingInputStream;
import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;

public final class DelimitedProtoStreamReader extends StreamReader {
private final CountingInputStream countingInputStream;
public final class DelimitedProtoStreamReader implements StreamReader {
private final InputStream inputStream;

public DelimitedProtoStreamReader(InputStream inputStream) {
super(new CountingInputStream(inputStream));
countingInputStream = (CountingInputStream) this.inputStream;
this.inputStream = inputStream;
}

@Override
@Nullable
public ReadResult read() throws IOException {
int startingPosition = countingInputStream.getPosition();
public ReadResult readNext() throws IOException {
int itemSize = getNextItemSize();
if (itemSize < 1) {
return null;
Expand All @@ -31,7 +28,7 @@ public ReadResult read() throws IOException {
if (inputStream.read(bytes) < 0) {
return null;
}
return new ReadResult(bytes, countingInputStream.getPosition() - startingPosition);
return new ReadResult(bytes);
}

private int getNextItemSize() {
Expand All @@ -46,6 +43,11 @@ private int getNextItemSize() {
}
}

@Override
public void close() throws IOException {
inputStream.close();
}

public static class Factory implements StreamReader.Factory {

private static final Factory INSTANCE = new DelimitedProtoStreamReader.Factory();
Expand All @@ -57,8 +59,8 @@ public static Factory getInstance() {
private Factory() {}

@Override
public StreamReader create(InputStream stream) {
return new DelimitedProtoStreamReader(stream);
public StreamReader create(InputStream inputStream) {
return new DelimitedProtoStreamReader(inputStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,7 @@ public final class ReadResult {
/** The consumable data. */
public final byte[] content;

/**
* The total amount of data read from the stream. This number can be greater than the content
* length as it also takes into account any delimiters size.
*/
public final int totalReadLength;

public ReadResult(byte[] content, int totalReadLength) {
public ReadResult(byte[] content) {
this.content = content;
this.totalReadLength = totalReadLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,11 @@
import java.io.InputStream;
import javax.annotation.Nullable;

public abstract class StreamReader implements Closeable {
protected final InputStream inputStream;

protected StreamReader(InputStream inputStream) {
this.inputStream = inputStream;
}

public interface StreamReader extends Closeable {
@Nullable
public abstract ReadResult read() throws IOException;

@Override
public void close() throws IOException {
inputStream.close();
}
ReadResult readNext() throws IOException;

public interface Factory {
interface Factory {
StreamReader create(InputStream stream);
}
}
Loading
Loading