Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redesign when to generate ChunkGroupFooter #526

Merged
merged 3 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile)
isRowGroupHasData = true;
// the datasize and numOfChunk is fake
// the accurate datasize and numOfChunk will get after write all this deltaObject data.
fileIOWriter.startFlushChunkGroup(deltaObjectId,0,numOfChunk);
fileIOWriter.startFlushChunkGroup(deltaObjectId);//TODO please check me.
startPos = fileIOWriter.getPos();
}
// init the serieswWriteImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,21 @@ public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter tsFileIOW
for (String deltaObjectId : iMemTable.getMemTableMap().keySet()) {
long startPos = tsFileIOWriter.getPos();
long recordCount = 0;
ChunkGroupFooter chunkGroupFooter = tsFileIOWriter.startFlushChunkGroup(deltaObjectId,0,
iMemTable.getMemTableMap().get(deltaObjectId).size());
tsFileIOWriter.startFlushChunkGroup(deltaObjectId);
int seriesNumber = iMemTable.getMemTableMap().get(deltaObjectId).size();
for (String measurementId : iMemTable.getMemTableMap().get(deltaObjectId).keySet()) {
//TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
IWritableMemChunk series = iMemTable.getMemTableMap().get(deltaObjectId).get(measurementId);
MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
PageWriter pageWriter = new PageWriter(desc);
ChunkBuffer chunkBuffer = new ChunkBuffer(desc);
IChunkWriter seriesWriter = new ChunkWriterImpl(desc,chunkBuffer, pageSizeThreshold);
IChunkWriter seriesWriter = new ChunkWriterImpl(desc, chunkBuffer, pageSizeThreshold);
recordCount += writeOneSeries(series.getSortedTimeValuePairList(), seriesWriter, desc.getType());
seriesWriter.writeToFileWriter(tsFileIOWriter);
}
long memSize = tsFileIOWriter.getPos() - startPos;
chunkGroupFooter.setDataSize(memSize);
tsFileIOWriter.endChunkGroup(chunkGroupFooter);
ChunkGroupFooter footer = new ChunkGroupFooter(deltaObjectId, memSize, seriesNumber);
tsFileIOWriter.endChunkGroup(footer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,25 @@ public void test() throws IOException {
assertEquals(TsFileIOWriter.magicStringBytes.length, bufferWriteIO.getPos());
assertEquals(0, bufferWriteIO.getAppendedRowGroupMetadata().size());
// construct one rowgroup
ChunkGroupFooter chunkGroupFooter = bufferWriteIO.startFlushChunkGroup("d1",1000,10);
bufferWriteIO.startFlushChunkGroup("d1");
ChunkGroupFooter chunkGroupFooter = new ChunkGroupFooter("d1", 1000, 10);
bufferWriteIO.endChunkGroup(chunkGroupFooter);
assertEquals(1, bufferWriteIO.getChunkGroupMetaDatas().size());
assertEquals(1, bufferWriteIO.getAppendedRowGroupMetadata().size());
List<ChunkGroupMetaData> metadatas = bufferWriteIO.getAppendedRowGroupMetadata();
ChunkGroupMetaData rowgroup = metadatas.get(0);
assertEquals("d1", rowgroup.getDeviceID());
// construct another two rowgroup
chunkGroupFooter = bufferWriteIO.startFlushChunkGroup("d1",1000,10);
bufferWriteIO.startFlushChunkGroup("d1");
chunkGroupFooter = new ChunkGroupFooter("d1", 1000, 10);
bufferWriteIO.endChunkGroup(chunkGroupFooter);

chunkGroupFooter = bufferWriteIO.startFlushChunkGroup("d1",1000,10);
bufferWriteIO.startFlushChunkGroup("d1");
chunkGroupFooter = new ChunkGroupFooter("d1", 1000, 10);
bufferWriteIO.endChunkGroup(chunkGroupFooter);

chunkGroupFooter = bufferWriteIO.startFlushChunkGroup("d1",1000,10);
bufferWriteIO.startFlushChunkGroup("d1");
chunkGroupFooter = new ChunkGroupFooter("d1", 1000, 10);
bufferWriteIO.endChunkGroup(chunkGroupFooter);

metadatas = bufferWriteIO.getAppendedRowGroupMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,15 @@ private boolean checkMemorySizeAndMayFlushGroup() throws IOException {
private boolean flushAllChunkGroups() throws IOException {
if (recordCount > 0) {
long totalMemStart = fileWriter.getPos();
//make sure all the pages have been compressed into buffers, so that we can get correct groupWriter.getCurrentChunkGroupSize().
for (IChunkGroupWriter writer : groupWriters.values()) {
writer.preFlush();
}

for (String deviceId : groupWriters.keySet()) {
long pos = fileWriter.getPos();
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
long ChunkGroupSize = groupWriter.getCurrentChunkGroupSize();
ChunkGroupFooter chunkGroupFooter = fileWriter.startFlushChunkGroup(deviceId, ChunkGroupSize, groupWriter.getSeriesNumber());
groupWriter.flushToFileWriter(fileWriter);

if (fileWriter.getPos() - pos != ChunkGroupSize)
fileWriter.startFlushChunkGroup(deviceId);
ChunkGroupFooter chunkGroupFooter = groupWriter.flushToFileWriter(fileWriter);
if (fileWriter.getPos() - pos != chunkGroupFooter.getDataSize())
throw new IOException(String.format("Flushed data size is inconsistent with computation! Estimated: %d, Actuall: %d",
ChunkGroupSize, fileWriter.getPos() - pos));
chunkGroupFooter.getDataSize(), fileWriter.getPos() - pos));

fileWriter.endChunkGroup(chunkGroupFooter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import java.util.Map;

import cn.edu.tsinghua.tsfile.file.footer.ChunkGroupFooter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,11 +54,15 @@ public void write(long time, List<DataPoint> data) throws WriteProcessException,
}

@Override
public void flushToFileWriter(TsFileIOWriter fileWriter) throws IOException {
public ChunkGroupFooter flushToFileWriter(TsFileIOWriter fileWriter) throws IOException {
LOG.debug("start flush device id:{}", deviceId);
//make sure all the pages have been compressed into buffers, so that we can get correct groupWriter.getCurrentChunkGroupSize().
sealAllChunks();
ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, getCurrentChunkGroupSize(), getSeriesNumber());
for (IChunkWriter seriesWriter : chunkWriters.values()) {
seriesWriter.writeToFileWriter(fileWriter);
}
return footer;
}

@Override
Expand All @@ -78,10 +83,12 @@ public long getCurrentChunkGroupSize() {
return size;
}

@Override
public void preFlush() {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the preFlush or the sealCurrentPage should not be public.

* seal all the chunks which may has un-sealed pages in force.
*/
private void sealAllChunks() {
for (IChunkWriter writer : chunkWriters.values()) {
writer.preFlush();
writer.sealCurrentPage();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private void writePage() {

@Override
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
preFlush();
sealCurrentPage();
chunkBuffer.writeAllPagesOfSeriesToTsFile(tsfileWriter, chunkStatistics);
chunkBuffer.reset();
// reset series_statistics
Expand All @@ -243,7 +243,7 @@ public long getCurrentChunkSize(){
}

@Override
public void preFlush() {
public void sealCurrentPage() {
if (valueCountInOnePage > 0) {
writePage();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.edu.tsinghua.tsfile.write.chunk;

import cn.edu.tsinghua.tsfile.file.footer.ChunkGroupFooter;
import cn.edu.tsinghua.tsfile.write.schema.MeasurementSchema;
import cn.edu.tsinghua.tsfile.exception.write.WriteProcessException;
import cn.edu.tsinghua.tsfile.write.writer.TsFileIOWriter;
Expand Down Expand Up @@ -35,7 +36,7 @@ public interface IChunkGroupWriter {
* @param tsfileWriter - TSFileIOWriter
* @throws IOException exception in IO
*/
void flushToFileWriter(TsFileIOWriter tsfileWriter) throws IOException;
ChunkGroupFooter flushToFileWriter(TsFileIOWriter tsfileWriter) throws IOException;

/**
* get the max memory occupied at this time.
Expand All @@ -56,14 +57,10 @@ public interface IChunkGroupWriter {
void addSeriesWriter(MeasurementSchema measurementSchema, int pageSize);

/**
* @return get the serialized size of current chunkGroup header + all chunks
* @return get the serialized size of current chunkGroup header + all chunks.
* Notice, the value does not include any un-sealed page in the chunks.
*/
long getCurrentChunkGroupSize();

/**
* call all the series to prepare to flush data.
*/
void preFlush();

int getSeriesNumber();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ public interface IChunkWriter {
long estimateMaxSeriesMemSize();

/**
* return the serialized size of the chunk header + all pages
* return the serialized size of the chunk header + all pages (not include the un-sealed page).
* Notice, call this method before calling writeToFileWriter(), otherwise the page buffer in memory will be cleared.
*/
long getCurrentChunkSize();

/**
* prepare to flush data into file.
* seal the current page which may has not enough data points in force.
*
*/
void preFlush();
void sealCurrentPage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each chunkwriter should control package a page by itself when a page is full or the chunk will be flushed.


int getNumOfPages();
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,9 @@ private void startFile() throws IOException {
* @param dataSize the serialized size of all chunks
* @return the serialized size of ChunkGroupFooter
*/
public ChunkGroupFooter startFlushChunkGroup(String deviceId, long dataSize, int numberOfChunks) throws IOException {
public void startFlushChunkGroup(String deviceId) throws IOException {
LOG.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
currentChunkGroupMetaData = new ChunkGroupMetaData(deviceId, new ArrayList<>());
ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, dataSize, numberOfChunks);
LOG.debug("finishing writing chunk group header {}, file position {}", footer, out.getPosition());
return footer;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public void before() throws IOException {
statistics.updateStats(0L);

// chunk group 1
ChunkGroupFooter footer = writer.startFlushChunkGroup(deviceId, 0, 0);
writer.startFlushChunkGroup(deviceId);
writer.startFlushChunk(measurementSchema, measurementSchema.getCompressor().getType(), measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
writer.endChunk(0);
ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, 0, 1);
writer.endChunkGroup(footer);

// end file
Expand Down