Skip to content

[Bug] high throughput stream load request for csv input could lead to inaccurate load response  #43241

Open
@TsukiokaKogane

Description

@TsukiokaKogane

Search before asking

  • I had searched in the issues and found no similar issues.

Version

2.1.6

What's Wrong?

In case of high throughput zero error tolerance stream load with csv format input, inaccurate response could occur like:

{
    "TxnId": 20035,
    "Label": "efe0c57c-ed07-4ee4-b61c-c2df93970fa1",
    "Comment": "",
    "TwoPhaseCommit": "false",
    "Status": "Fail",
    "Message": "[INTERNAL_ERROR]cancelled: closed",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 0,
    "LoadTimeMs": 0,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 0,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 0,
    "CommitAndPublishTimeMs": 0,
    "ErrorURL": ""
}

The reason behind that is because the csv reader detected the error in CsvReader::_validate_line and closed the stream load pipe in CsvReader::close first.
After that StreamLoadAction::on_chunk_data failed to append data to the pipe and set the stream load context status to <INTERNAL_ERROR] cancelled>.
Finally in StreamLoadAction::handle, since the stream load context status is not ok, it just skips waiting and return the partial & inaccurate response before instance actually finished.

void StreamLoadAction::handle(HttpRequest* req) {
    std::shared_ptr<StreamLoadContext> ctx =
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
    if (ctx == nullptr) {
        return;
    }

    // status already set to fail
    if (ctx->status.ok()) { *// bypassed by error status*
        ctx->status = _handle(ctx);
        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
                         << ", errmsg=" << ctx->status;
        }
    }
    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
        if (ctx->need_rollback) {
            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
            ctx->need_rollback = false;
        }
        if (ctx->body_sink.get() != nullptr) {
            ctx->body_sink->cancel(ctx->status.to_string());
        }
    }

    auto str = ctx->to_json();
    // add new line at end
    str = str + '\n';
    HttpChannel::send_reply(req, str);

}

What You Expected?

stream load should return full response

How to Reproduce?

No response

Anything Else?

I have 2 potential idea for fixing:

  • one is to simply remove if (ctx->status.ok()) condition in StreamLoadAction::handle
   // status already set to fail
    if (ctx->status.ok()) {
        ctx->status = _handle(ctx);
        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
                         << ", errmsg=" << ctx->status;
        }
    }
  • another is to set stream load status before the pipe closes

Looking forward for more opinions!

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions