Skip to content

Commit ed03e29

Browse files
committed
feat(pb): add runner logs
1 parent d0f985a commit ed03e29

File tree

6 files changed

+40
-5
lines changed

6 files changed

+40
-5
lines changed

packages/edge/infra/client/container-runner/src/log_shipper.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct LogShipper {
3737
pub vector_socket_addr: String,
3838

3939
pub runner_id: String,
40+
pub actor_id: Option<String>,
4041
}
4142

4243
impl LogShipper {
@@ -91,7 +92,7 @@ impl LogShipper {
9192
while let Result::Ok(message) = self.msg_rx.recv() {
9293
let vector_message = VectorMessage::Runners {
9394
runner_id: self.runner_id.as_str(),
94-
task: "main", // Backwards compatibility with logs
95+
actor_id: self.actor_id.as_ref().map(|x| x.as_str()),
9596
stream_type: message.stream_type as u8,
9697
ts: message.ts,
9798
message: message.message.as_str(),
@@ -114,7 +115,7 @@ enum VectorMessage<'a> {
114115
#[serde(rename = "runners")]
115116
Runners {
116117
runner_id: &'a str,
117-
task: &'a str,
118+
actor_id: Option<&'a str>,
118119
stream_type: u8,
119120
ts: u64,
120121
message: &'a str,

packages/edge/infra/client/container-runner/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ fn main() -> Result<()> {
3737
.transpose()
3838
.context("failed to parse vector socket addr")?;
3939
let runner_id = var("RUNNER_ID")?;
40+
// Only set if this is a single allocation runner (one actor running on it)
41+
let actor_id = var("ACTOR_ID").ok();
4042

4143
let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
4244

@@ -49,6 +51,7 @@ fn main() -> Result<()> {
4951
msg_rx,
5052
vector_socket_addr,
5153
runner_id,
54+
actor_id,
5255
};
5356
let log_shipper_thread = log_shipper.spawn();
5457
(Some(msg_tx), Some(log_shipper_thread))

packages/edge/infra/client/manager/src/actor/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,15 @@ impl Actor {
114114
.context("should have runner config")?
115115
{
116116
protocol::ActorRunner::New { .. } => {
117+
let actor_id = matches!(
118+
self.runner.config().image.allocation_type,
119+
protocol::ImageAllocationType::Single
120+
)
121+
.then_some(self.actor_id);
122+
117123
// Because the runner is not already started we can get the ports here instead of reading from
118124
// sqlite
119-
let ports = self.runner.start(ctx).await?;
125+
let ports = self.runner.start(ctx, actor_id).await?;
120126

121127
let pid = self.runner.pid().await?;
122128

packages/edge/infra/client/manager/src/runner/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,12 @@ impl Runner {
243243
Ok(())
244244
}
245245

246+
// `actor_id` is set if this runner has a single allocation type which means there is only one actor
247+
// runner on it
246248
pub async fn start(
247249
self: &Arc<Self>,
248250
ctx: &Arc<Ctx>,
251+
actor_id: Option<Uuid>,
249252
) -> Result<protocol::HashableMap<String, protocol::ProxiedPort>> {
250253
tracing::info!(runner_id=?self.runner_id, "starting");
251254

@@ -298,7 +301,7 @@ impl Runner {
298301
let self2 = self.clone();
299302
let ctx2 = ctx.clone();
300303
tokio::spawn(async move {
301-
match self2.run(&ctx2).await {
304+
match self2.run(&ctx2, actor_id).await {
302305
Ok(_) => {
303306
if let Err(err) = self2.observe(&ctx2, false).await {
304307
tracing::error!(runner_id=?self2.runner_id, ?err, "observe failed");
@@ -318,7 +321,7 @@ impl Runner {
318321
Ok(proxied_ports)
319322
}
320323

321-
async fn run(&self, ctx: &Ctx) -> Result<()> {
324+
async fn run(&self, ctx: &Ctx, actor_id: Option<Uuid>) -> Result<()> {
322325
// NOTE: This is the env that goes to the container-runner process, NOT the env that in inserted into
323326
// the container.
324327
let mut runner_env = vec![
@@ -328,6 +331,11 @@ impl Runner {
328331
),
329332
("RUNNER_ID", self.runner_id.to_string()),
330333
];
334+
335+
if let Some(actor_id) = actor_id {
336+
runner_env.push(("ACTOR_ID", actor_id.to_string()));
337+
}
338+
331339
if let Some(vector) = &ctx.config().vector {
332340
runner_env.push(("VECTOR_SOCKET_ADDR", vector.address.to_string()));
333341
}

packages/edge/services/pegboard/db/runner-log/migrations/20200101000000_init.down.sql

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
CREATE TABLE IF NOT EXISTS runner_logs (
3+
runner_id UUID,
4+
actor_id UUID, -- When not set will be the NIL UUID (all zeros)
5+
stream_type UInt8, -- pegboard::types::LogsStreamType
6+
ts DateTime64 (9),
7+
message String
8+
) ENGINE = ReplicatedMergeTree ()
9+
PARTITION BY
10+
toStartOfHour (ts)
11+
ORDER BY (
12+
runner_id,
13+
toUnixTimestamp (ts),
14+
stream_type
15+
)
16+
TTL toDate (ts + toIntervalDay (3))
17+
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

0 commit comments

Comments
 (0)