Skip to content

Commit

Permalink
Merge pull request #192 from tansu-io/191-deleting-a-topic-with-confi…
Browse files Browse the repository at this point in the history
…guration-causing-an-error-with-pg-storage

fix: pg constraint while deleting topic with configuration
  • Loading branch information
shortishly authored Jan 17, 2025
2 parents db6677b + d07024b commit 584af07
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 3 deletions.
212 changes: 212 additions & 0 deletions tansu-server/tests/topic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright ⓒ 2025 Peter Morgan <peter.james.morgan@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use common::{alphanumeric_string, register_broker};
use tansu_kafka_sans_io::{
create_topics_request::{CreatableTopic, CreatableTopicConfig},
ErrorCode,
};
use tansu_server::Result;
use tansu_storage::{Storage, StorageContainer, TopicId};
use tracing::debug;
use uuid::Uuid;

pub mod common;

pub async fn create_delete(
cluster_id: Uuid,
broker_id: i32,
mut sc: StorageContainer,
) -> Result<()> {
register_broker(&cluster_id, broker_id, &mut sc).await?;

let topic_name: String = alphanumeric_string(15);
debug!(?topic_name);

let num_partitions = 6;
let replication_factor = 0;

let assignments = Some([].into());
let configs = Some([].into());

let topic_id = sc
.create_topic(
CreatableTopic {
name: topic_name.clone(),
num_partitions,
replication_factor,
assignments: assignments.clone(),
configs: configs.clone(),
},
false,
)
.await?;
debug!(?topic_id);

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

pub async fn create_with_config_delete(
cluster_id: Uuid,
broker_id: i32,
mut sc: StorageContainer,
) -> Result<()> {
register_broker(&cluster_id, broker_id, &mut sc).await?;

let topic_name: String = alphanumeric_string(15);
debug!(?topic_name);

let num_partitions = 6;
let replication_factor = 0;

let assignments = Some([].into());
let configs = Some(
[CreatableTopicConfig {
name: "xyz".into(),
value: Some("12321".into()),
}]
.into(),
);

let topic_id = sc
.create_topic(
CreatableTopic {
name: topic_name.clone(),
num_partitions,
replication_factor,
assignments: assignments.clone(),
configs: configs.clone(),
},
false,
)
.await?;
debug!(?topic_id);

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

mod pg {
use common::{init_tracing, StorageType};
use rand::{prelude::*, thread_rng};
use url::Url;

use super::*;

fn storage_container(cluster: impl Into<String>, node: i32) -> Result<StorageContainer> {
Url::parse("tcp://127.0.0.1/")
.map_err(Into::into)
.and_then(|advertised_listener| {
common::storage_container(
StorageType::Postgres,
cluster,
node,
advertised_listener,
None,
)
})
}

#[tokio::test]
async fn create_delete() -> Result<()> {
let _guard = init_tracing()?;

let cluster_id = Uuid::now_v7();
let broker_id = thread_rng().gen_range(0..i32::MAX);

super::create_delete(
cluster_id,
broker_id,
storage_container(cluster_id, broker_id)?,
)
.await
}

#[tokio::test]
async fn create_with_config_delete() -> Result<()> {
let _guard = init_tracing()?;

let cluster_id = Uuid::now_v7();
let broker_id = thread_rng().gen_range(0..i32::MAX);

super::create_with_config_delete(
cluster_id,
broker_id,
storage_container(cluster_id, broker_id)?,
)
.await
}
}

mod in_memory {
use common::{init_tracing, StorageType};
use rand::{prelude::*, thread_rng};
use url::Url;

use super::*;

fn storage_container(cluster: impl Into<String>, node: i32) -> Result<StorageContainer> {
Url::parse("tcp://127.0.0.1/")
.map_err(Into::into)
.and_then(|advertised_listener| {
common::storage_container(
StorageType::InMemory,
cluster,
node,
advertised_listener,
None,
)
})
}

#[tokio::test]
async fn create_delete() -> Result<()> {
let _guard = init_tracing()?;

let cluster_id = Uuid::now_v7();
let broker_id = thread_rng().gen_range(0..i32::MAX);

super::create_delete(
cluster_id,
broker_id,
storage_container(cluster_id, broker_id)?,
)
.await
}

#[tokio::test]
async fn create_with_config_delete() -> Result<()> {
let _guard = init_tracing()?;

let cluster_id = Uuid::now_v7();
let broker_id = thread_rng().gen_range(0..i32::MAX);

super::create_with_config_delete(
cluster_id,
broker_id,
storage_container(cluster_id, broker_id)?,
)
.await
}
}
34 changes: 32 additions & 2 deletions tansu-server/tests/txn.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright ⓒ 2024 Peter Morgan <peter.james.morgan@gmail.com>
// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -30,7 +30,7 @@ use tansu_kafka_sans_io::{
};
use tansu_server::Result;
use tansu_storage::{
ListOffsetRequest, Storage, StorageContainer, Topition, TxnAddPartitionsRequest,
ListOffsetRequest, Storage, StorageContainer, TopicId, Topition, TxnAddPartitionsRequest,
TxnOffsetCommitRequest,
};
use tracing::{debug, error};
Expand Down Expand Up @@ -153,6 +153,11 @@ pub async fn simple_txn_commit_offset_commit(
assert!(offsets.contains_key(&topition));
assert_eq!(Some(&committed_offset), offsets.get(&topition));

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

Expand Down Expand Up @@ -270,6 +275,11 @@ pub async fn simple_txn_commit_offset_abort(
assert!(offsets.contains_key(&topition));
assert_eq!(Some(&-1), offsets.get(&topition));

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

Expand Down Expand Up @@ -553,6 +563,11 @@ pub async fn simple_txn_produce_commit(
assert_eq!(Some(offset), list_offsets_after[0].1.offset);
}

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

Expand Down Expand Up @@ -835,6 +850,11 @@ pub async fn simple_txn_produce_abort(
assert_eq!(Some(offset), list_offsets_after[0].1.offset);
}

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

Expand Down Expand Up @@ -1200,6 +1220,11 @@ pub async fn with_overlap(
assert_eq!(Some(offset), list_offsets_after[0].1.offset);
}

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

Expand Down Expand Up @@ -1514,6 +1539,11 @@ pub async fn init_producer_twice(
batches[0].records[0].value
);

assert_eq!(
ErrorCode::None,
sc.delete_topic(&TopicId::from(topic_id)).await?
);

Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion tansu-storage/src/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,9 +1147,13 @@ impl Storage for Postgres {

for (description, sql) in [
(
"consumer offsets",
"consumer_offsets",
include_sql!("pg/consumer_offset_delete_by_topic.sql"),
),
(
"topic_configuration",
include_sql!("pg/topic_configuration_delete_by_topic.sql"),
),
(
"watermarks",
include_sql!("pg/watermark_delete_by_topic.sql"),
Expand Down
22 changes: 22 additions & 0 deletions tansu-storage/src/pg/topic_configuration_delete_by_topic.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- -*- mode: sql; sql-product: postgres; -*-
-- Copyright ⓒ 2025 Peter Morgan <peter.james.morgan@gmail.com>
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- This program is distributed in the hope that it will be useful,
-- but WITHOUT ANY WARRANTY; without even the implied warranty of
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-- GNU Affero General Public License for more details.
--
-- You should have received a copy of the GNU Affero General Public License
-- along with this program. If not, see <https://www.gnu.org/licenses/>.

delete from topic_configuration
using cluster c, topic t
where c.name = $1
and t.name = $2
and t.cluster = c.id
and topic_configuration.topic = t.id;

0 comments on commit 584af07

Please # to comment.