Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void doRefresh() {
refreshFromMetadataLocation(metadataLocation);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
@SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
@Override
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
boolean newView = base == null;
Expand Down Expand Up @@ -209,16 +209,6 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
throw e;

} catch (Throwable e) {
if (e.getMessage() != null
&& e.getMessage()
.contains(
"The table has been modified. The parameter value for key '"
+ BaseMetastoreTableOperations.METADATA_LOCATION_PROP
+ "' is")) {
throw new CommitFailedException(
e, "The view %s.%s has been modified concurrently", database, viewName);
}

if (e.getMessage() != null
&& e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException(
Expand All @@ -228,18 +218,41 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
e);
}

LOG.error(
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database,
viewName,
e);
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
commitStatus =
checkCommitStatus(
viewName,
newMetadataLocation,
metadata.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation));
if (e.getMessage() != null
&& e.getMessage()
.contains(
"The table has been modified. The parameter value for key '"
+ BaseMetastoreTableOperations.METADATA_LOCATION_PROP
+ "' is")) {
// It's possible the HMS client incorrectly retries a successful operation, due to network
// issue for example, and triggers this exception. So we need double-check to make sure
// this is really a concurrent modification. Hitting this exception means no pending
// requests, if any, can succeed later, so it's safe to check status in strict mode
commitStatus =
checkCommitStatusStrict(
viewName,
newMetadataLocation,
metadata.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation));
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
throw new CommitFailedException(
e, "The view %s.%s has been modified concurrently", database, viewName);
}
} else {
LOG.error(
"Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database,
viewName,
e);
commitStatus =
checkCommitStatus(
viewName,
newMetadataLocation,
metadata.properties(),
() -> checkCurrentMetadataLocation(newMetadataLocation));
}

switch (commitStatus) {
case SUCCESS:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,50 @@ public void testNoLockThriftExceptionConcurrentCommit() throws TException, Inter
.isEqualTo(1);
}

/**
* Pretends the HMS client incorrectly reports a concurrent modification error even though the
* underlying alter actually succeeded, e.g. because the client retried an already-successful
* request due to a network issue. The commit should double-check the actual metadata location
* before giving up, find that the new location is already current, and complete without throwing.
*/
@Test
public void modifiedConcurrentlyExceptionSucceedsWhenActuallyCommitted()
throws TException, InterruptedException {
HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
ViewMetadata metadataV1 = ops.current();
assertThat(metadataV1.properties()).hasSize(0);

view.updateProperties().set("k1", "v1").commit();
ops.refresh();
ViewMetadata metadataV2 = ops.current();
assertThat(metadataV2.properties()).hasSize(1).containsEntry("k1", "v1");

HiveViewOperations spyOps = spy(ops);

// Persist for real, but then report a concurrent modification error as if the HMS client had
// incorrectly retried an already-successful alter_table call
doAnswer(
i -> {
org.apache.hadoop.hive.metastore.api.Table tbl =
i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
boolean updateHiveView = i.getArgument(1, Boolean.class);
String location = i.getArgument(2, String.class);
ops.persistTable(tbl, updateHiveView, location);
throw new RuntimeException(
"MetaException(message:The table has been modified. The parameter value for key 'metadata_location' is");
})
.when(spyOps)
.persistTable(any(), anyBoolean(), any());

spyOps.commit(metadataV2, metadataV1);

ops.refresh();
assertThat(ops.current().metadataFileLocation())
.as("Commit should succeed once the double check finds the new location is current")
.isEqualTo(metadataV1.metadataFileLocation());
assertThat(ops.current().properties()).hasSize(0);
}

@Test
public void testLockExceptionUnknownSuccessCommit() throws TException, InterruptedException {
HiveViewOperations ops = (HiveViewOperations) ((BaseView) view).operations();
Expand Down