Skip to content

Commit

Permalink
[Issue #32]: fix listener. (#34)
Browse files Browse the repository at this point in the history
Upgrade listener to be compatible with Trino SPI.
  • Loading branch information
bianhq authored Jul 10, 2022
1 parent d272c03 commit c79757d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 138 deletions.
190 changes: 54 additions & 136 deletions listener/src/main/java/io/pixelsdb/pixels/trino/PixelsEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,27 @@
*/
package io.pixelsdb.pixels.trino;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.ParserConfig;
import io.trino.spi.TrinoException;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;
import io.airlift.log.Logger;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.common.utils.HttpUtil;
import io.pixelsdb.pixels.trino.exception.ListenerExecption;
import io.pixelsdb.pixels.trino.exception.ListenerException;
import io.trino.spi.TrinoException;
import io.trino.spi.eventlistener.*;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;

import static io.pixelsdb.pixels.trino.exception.ListenerErrorCode.PIXELS_EVENT_LISTENER_ERROR;
import static io.pixelsdb.pixels.trino.exception.ListenerErrorCode.PIXELS_EVENT_LISTENER_METRIC_ERROR;

/**
* @author hank
*/
public class PixelsEventListener implements EventListener
{
private static Logger logger = Logger.get(PixelsEventListener.class);
private static final Logger logger = Logger.get(PixelsEventListener.class);

private final String logDir;
private final boolean enabled;
Expand Down Expand Up @@ -78,19 +73,21 @@ public PixelsEventListener (String logDir, boolean enabled,
this.queryType = queryType;
try
{
if (this.enabled == true && LogWriter == null)
if (this.enabled && LogWriter == null)
{
LogWriter = new BufferedWriter(new FileWriter(
this.logDir + "pixels_query_" +
this.logDir + "pixels_trino_query" +
DateUtil.getCurTime() + ".log", true));
LogWriter.write("\"query id\",\"user\",\"elapsed (ms)\",\"execution (ms)\",\"read throughput (MB)\",gc time (ms)");
LogWriter.write("\"query id\",\"user\",\"wall(ms)\",\"rs waiting(ms)\",\"queued(ms)\"," +
"\"planning(ms)\",\"execution(ms)\",\"read throughput(MB)\",\"local gc time(ms)\"," +
"\"full gc tasks\",\"avg full gc time(s)\"");
LogWriter.newLine();
LogWriter.flush();
}
} catch (IOException e)
{
throw new TrinoException(PIXELS_EVENT_LISTENER_ERROR,
new ListenerExecption("can not create log writer."));
new ListenerException("can not create log writer."));
}
}

Expand All @@ -102,7 +99,7 @@ public void queryCreated(QueryCreatedEvent queryCreatedEvent)
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
if (this.enabled == false)
if (!this.enabled)
{
return;
}
Expand All @@ -129,59 +126,69 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent)

/**
* Issue #132:
* TODO: add full gc count and time to the output.
* TODO: add cpu and memory statistics to the output.
* TODO: make use of resource estimates.
*/
String queryId = queryCompletedEvent.getMetadata().getQueryId();
String user = queryCompletedEvent.getContext().getUser();
if (queryCompletedEvent.getContext().getSchema().isEmpty())
{
logger.error("can not write log in pixels event listener.");
logger.info("query id: " + queryId + ", user: " + user);
return;
}
String schema = queryCompletedEvent.getContext().getSchema().get();
String uri = queryCompletedEvent.getMetadata().getUri().toString();
if (schema.equalsIgnoreCase(this.schema))
{
if (this.userPrefix.equals("none") ||
(!this.userPrefix.equals("none") && user.startsWith(this.userPrefix)))
if (this.userPrefix.equals("none") || user.startsWith(this.userPrefix))
{
try
{
String content = HttpUtil.GetContentByGet(uri.toString());
JSONObject object = JSONObject.parseObject(content);
String query = object.getString("query");
String query = queryCompletedEvent.getMetadata().getQuery();
if (query.toLowerCase().contains(this.queryType.toLowerCase()))
{
JSONObject statsObject = object.getJSONObject("queryStats");
double elapsed = this.parseElapsedToMillis(statsObject.getString("elapsedTime"));
double queued = this.parseElapsedToMillis(statsObject.getString("queuedTime"));
double analysis = this.parseElapsedToMillis(statsObject.getString("analysisTime"));
double planning = this.parseElapsedToMillis(statsObject.getString("totalPlanningTime"));
double finishing = this.parseElapsedToMillis(statsObject.getString("finishingTime"));
double inputDataSize = this.parseDataSizeToMB(statsObject.getString("rawInputDataSize"));
if (elapsed < 0 || queued < 0 || analysis < 0 ||
planning < 0 || finishing < 0 || inputDataSize < 0)
QueryStatistics stats = queryCompletedEvent.getStatistics();
long execution = -1;
if (stats.getExecutionTime().isPresent())
{
execution = stats.getExecutionTime().get().toMillis();
}
long queued = stats.getQueuedTime().toMillis();
long planning = -1;
if (stats.getPlanningTime().isPresent())
{
throw new ListenerExecption("elapsedTime:" + statsObject.getString("elapsedTime") +
",queuedTime:" + statsObject.getString("queuedTime") +
",analysisTime:" + statsObject.getString("analysisTime") +
",totalPlanningTime:" + statsObject.getString("totalPlanningTime") +
",finishingTime:" + statsObject.getString("finishingTime") +
",rawInputDataSize:" + statsObject.getString("rawInputDataSize")
);
planning = stats.getPlanningTime().get().toMillis();
}
double execution = elapsed - queued - analysis - planning - finishing;
double throughput = inputDataSize / execution * 1000;
LogWriter.write(queryId + "," + user + "," + elapsed + "," + execution + "," + throughput + "," + (gcms>=0 ? gcms : "na"));
long rsWaiting = -1;
if (stats.getResourceWaitingTime().isPresent())
{
rsWaiting = stats.getResourceWaitingTime().get().toMillis();
}
long wall = stats.getWallTime().toMillis();
double inputDataSize = stats.getPhysicalInputBytes();
double throughput = inputDataSize / execution / 1024;

double totalGcSec = 0;
int gcTasks = 0;
int tasks = 0;
List<StageGcStatistics> stageGcStats = stats.getStageGcStatistics();
for (StageGcStatistics gcStats : stageGcStats)
{
totalGcSec += gcStats.getTotalFullGcSec();
gcTasks += gcStats.getFullGcTasks();
tasks += gcStats.getTasks();
}

LogWriter.write(queryId + "," + user + "," + wall + "," + rsWaiting + "," +
queued + "," + planning + "," + execution + "," + throughput + "," +
(gcms>=0 ? gcms : "na") + "," + gcTasks + "," + (tasks > 0 ? totalGcSec/tasks : "na"));
LogWriter.newLine();
LogWriter.flush();
}
} catch (IOException e)
{
logger.error("can not write log in pixels event listener.");
logger.info("query id: " + queryId + ", user: " + user + ", uri: " + uri.toString());
} catch (ListenerExecption e)
{
logger.error("can not parse metrics in presto json.");
logger.info("query id: " + queryId + ", user: " + user + ", uri: " + uri.toString());
throw new TrinoException(PIXELS_EVENT_LISTENER_METRIC_ERROR, e);
logger.info("query id: " + queryId + ", user: " + user);
}
}
}
Expand All @@ -191,93 +198,4 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}

/**
* parse elapsed milli seconds from string.
* return -1 if the string can not be parsed.
* @param str
* @return
*/
private double parseElapsedToMillis (String str)
{
if (str == null)
{
return 0;
}

if (str.endsWith("ns"))
{
return Double.parseDouble(str.substring(0, str.indexOf("ns"))) / 1000 / 1000;
}
else if (str.endsWith("us"))
{
return Double.parseDouble(str.substring(0, str.indexOf("us"))) / 1000;
}
else if (str.endsWith("ms"))
{
return Double.parseDouble(str.substring(0, str.indexOf("ms")));
}
else if (str.endsWith("s"))
{
return Double.parseDouble(str.substring(0, str.indexOf("s"))) * 1000;
}
else if (str.endsWith("m"))
{
return Double.parseDouble(str.substring(0, str.indexOf("m"))) * 60 * 1000;
}
else if (str.endsWith("h"))
{
return Double.parseDouble(str.substring(0, str.indexOf("h"))) * 60 * 60 * 1000;
}
else
{
return -1;
}
}

/**
* parse the megabytes from string.
* return -1 if the string can not be parsed.
* @param str
* @return
*/
private double parseDataSizeToMB (String str)
{
if (str == null)
{
return 0;
}

if (str.endsWith("KB"))
{
return Double.parseDouble(str.substring(0, str.indexOf("KB"))) / 1024;
}
else if (str.endsWith("MB"))
{
return Double.parseDouble(str.substring(0, str.indexOf("MB")));
}
else if (str.endsWith("GB"))
{
return Double.parseDouble(str.substring(0, str.indexOf("GB"))) * 1024;
}
else if (str.endsWith("TB"))
{
return Double.parseDouble(str.substring(0, str.indexOf("TB"))) * 1024 * 1024;
}
else if (str.endsWith("B"))
{
return Double.parseDouble(str.substring(0, str.indexOf("B"))) / 1024 / 1024;
}
else
{
return -1;
}
}

@Override
protected void finalize() throws Throwable
{
super.finalize();
LogWriter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
/**
* @author hank
*/
public class ListenerExecption extends Exception
public class ListenerException extends Exception
{
public ListenerExecption(String msg)
public ListenerException(String msg)
{
super(msg);
}
Expand Down

0 comments on commit c79757d

Please # to comment.