Skip to content

Commit

Permalink
HIVE-26563 Add extra columns in Show Compactions output and sort the …
Browse files Browse the repository at this point in the history
…output (Kirti Ruge, reviewed by Denys Kuzmenko)

Closes apache#3625
  • Loading branch information
rkirtir authored and yeahyung committed Jul 20, 2023
1 parent 51ad727 commit 4d0dc5f
Showing 16 changed files with 905 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.io.*;

import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactorBase.executeStatementOnDriver;

@@ -137,9 +138,9 @@ public void testShowCompactionsContainsPoolName() throws Exception {
List results = new ArrayList();
driver.getResults(results);
Assert.assertEquals(3, results.size());
Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name",
results.get(0));
Pattern p = Pattern.compile("(1|2)\tdefault\t(compaction_test|table2)\t --- \tMAJOR\tinitiated.*(pool1|default)");
Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time\tDuration(ms)" +
"\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\tHighest WriteID", results.get(0));
Pattern p = Pattern.compile("(1|2)\tdefault\t(compaction_test|table2)\t --- \tMAJOR\tinitiated.*(pool1|default).*");
for(int i = 1; i < results.size(); i++) {
Assert.assertTrue(p.matcher(results.get(i).toString()).matches());
}
@@ -162,9 +163,10 @@ public void testShowCompactionsRespectPoolName() throws Exception {
List results = new ArrayList();
driver.getResults(results);
Assert.assertEquals(2, results.size());
Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time\tDuration(ms)\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name",
Assert.assertEquals("CompactionId\tDatabase\tTable\tPartition\tType\tState\tWorker host\tWorker\tEnqueue Time\tStart Time\tDuration(ms)" +
"\tHadoopJobId\tError message\tInitiator host\tInitiator\tPool name\tTxnId\tNext TxnId\tCommit Time\tHighest WriteID",
results.get(0));
Pattern p = Pattern.compile("1|2\tdefault\tcompaction_test\t --- \tMAJOR\tinitiated.*pool1");
Pattern p = Pattern.compile("1|2\tdefault\tcompaction_test\t --- \tMAJOR\tinitiated.*pool1.*");
Assert.assertTrue(p.matcher(results.get(1).toString()).matches());
}

Original file line number Diff line number Diff line change
@@ -231,12 +231,12 @@ public void testMinorCompactionShouldBeRefusedOnTablesWithOriginalFiles() throws
if (2 != compacts.size()) {
Assert.fail("Expecting 2 rows and found " + compacts.size() + " files " + compacts);
}
Assert.assertEquals("did not initiate", compacts.get(0).getState());
Assert.assertEquals("refused", compacts.get(0).getState());
Assert.assertTrue(compacts.get(0).getErrorMessage()
.startsWith("Caught exception while trying to determine if we should compact"));
Assert.assertEquals("refused", compacts.get(1).getState());
Assert.assertTrue(compacts.get(1).getErrorMessage()
.startsWith("Query based Minor compaction is not possible for full acid tables having raw format (non-acid) data in them."));
Assert.assertEquals("did not initiate", compacts.get(1).getState());
Assert.assertTrue(compacts.get(1).getErrorMessage()
.startsWith("Caught exception while trying to determine if we should compact "));
}

@Test
@@ -301,10 +301,10 @@ public void testMinorCompactionShouldBeRefusedOnTablesWithRawData() throws Excep
if (2 != compacts.size()) {
Assert.fail("Expecting 2 rows and found " + compacts.size() + " files " + compacts);
}
Assert.assertEquals("did not initiate", compacts.get(0).getState());
Assert.assertTrue(compacts.get(0).getErrorMessage().startsWith("Caught exception while trying to determine if we should compact"));
Assert.assertEquals("refused", compacts.get(1).getState());
Assert.assertTrue(compacts.get(1).getErrorMessage().startsWith("Query based Minor compaction is not possible for full acid tables having raw format (non-acid) data in them."));
Assert.assertEquals("refused", compacts.get(0).getState());
Assert.assertTrue(compacts.get(0).getErrorMessage().startsWith("Query based Minor compaction is not possible for full acid tables having raw format (non-acid) data in them."));
Assert.assertEquals("did not initiate", compacts.get(1).getState());
Assert.assertTrue(compacts.get(1).getErrorMessage().startsWith("Caught exception while trying to determine if we should compact"));
}

/**
Original file line number Diff line number Diff line change
@@ -33,8 +33,8 @@ public class ShowCompactionsDesc implements DDLDesc, Serializable {

// @formatter:off
public static final String SCHEMA =
"compactionid,dbname,tabname,partname,type,state,workerhost,workerid,enqueuetime,starttime,duration,hadoopjobid,errormessage,initiatorhost,initiatorid,poolname#" +
"string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string";
"compactionid,dbname,tabname,partname,type,state,workerhost,workerid,enqueuetime,starttime,duration,hadoopjobid,errormessage,initiatorhost,initiatorid,poolname,txnid,nexttxnid,committime,hightestwriteid#" +
"string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string:string";
// @formatter:on

private String resFile;
Original file line number Diff line number Diff line change
@@ -100,6 +100,14 @@ private void writeHeader(DataOutputStream os) throws IOException {
os.writeBytes("Initiator");
os.write(Utilities.tabCode);
os.writeBytes("Pool name");
os.write(Utilities.tabCode);
os.writeBytes("TxnId");
os.write(Utilities.tabCode);
os.writeBytes("Next TxnId");
os.write(Utilities.tabCode);
os.writeBytes("Commit Time");
os.write(Utilities.tabCode);
os.writeBytes("Highest WriteID");
os.write(Utilities.newLineCode);
}

@@ -137,6 +145,14 @@ private void writeRow(DataOutputStream os, ShowCompactResponseElement e) throws
os.writeBytes(getThreadIdFromId(e.getInitiatorId()));
os.write(Utilities.tabCode);
os.writeBytes(e.getPoolName());
os.write(Utilities.tabCode);
os.writeBytes(e.isSetTxnId() ? Long.toString(e.getTxnId()) : NO_VAL);
os.write(Utilities.tabCode);
os.writeBytes(e.isSetNextTxnId() ? Long.toString(e.getNextTxnId()) : NO_VAL);
os.write(Utilities.tabCode);
os.writeBytes(e.isSetCommitTime() ? Long.toString(e.getCommitTime()) : NO_VAL);
os.write(Utilities.tabCode);
os.writeBytes(e.isSetHightestWriteId() ? Long.toString(e.getHightestWriteId()) : NO_VAL);
os.write(Utilities.newLineCode);
}
}
55 changes: 55 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
Original file line number Diff line number Diff line change
@@ -3356,6 +3356,61 @@ public void testCompactionOutputDirectoryNamesOnPartitionsAndOldDeltasDeleted()
Assert.assertFalse(fs.exists(new Path(tablePath + oldDelta4)));
}

@Test
public void testShowCompactionOrder() throws Exception {

d.destroy();
hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
d = new Driver(hiveConf);
//generate some compaction history
runStatementOnDriver("drop database if exists mydb1 cascade");
runStatementOnDriver("create database mydb1");

runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " +
BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
" values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'");
runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'");
TestTxnCommands2.runWorker(hiveConf);
TestTxnCommands2.runCleaner(hiveConf);
runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'");
runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
" values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
TestTxnCommands2.runWorker(hiveConf);
TestTxnCommands2.runCleaner(hiveConf);
runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
" values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MINOR'");
TestTxnCommands2.runWorker(hiveConf);

runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) partitioned by (ds string) clustered by (a) into " +
BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
" values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') compact 'MAJOR'");
TestTxnCommands2.runWorker(hiveConf);

runStatementOnDriver("create table T (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("insert into T values(0,2)");//makes delta_1_1 in T1
runStatementOnDriver("insert into T values(1,4)");//makes delta_2_2 in T2

//create failed compaction attempt so that compactor txn is aborted
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
// Verify compaction order
List<ShowCompactResponseElement> compacts =
txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
Assert.assertEquals(6, compacts.size());
Assert.assertEquals(TxnStore.INITIATED_RESPONSE, compacts.get(0).getState());
Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(2).getState());
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(3).getState());
Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(4).getState());
Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());

}
private void compactPartition(String table, CompactionType type, String partition)
throws Exception {
CompactionRequest compactionRequest = new CompactionRequest("default", table, type);
Original file line number Diff line number Diff line change
@@ -53,9 +53,9 @@ PREHOOK: query: show compactions
PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name
1 default t1_n153 --- MAJOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default
2 default t2_n153 --- MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteID
1 default t1_n153 --- MAJOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 ---
2 default t2_n153 --- MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 ---
PREHOOK: query: drop table T1_n153
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@t1_n153
Original file line number Diff line number Diff line change
@@ -149,8 +149,8 @@ PREHOOK: query: show compactions
PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name
1 default partitioned_acid_table p=abc MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteID
1 default partitioned_acid_table p=abc MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 ---
PREHOOK: query: drop table partitioned_acid_table
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@partitioned_acid_table

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4d0dc5f

Please # to comment.