Skip to content

Commit

Permalink
NIFI-14236 Implement consistent hash for Attribute Partitioner (#9709)
Browse files Browse the repository at this point in the history
* NIFI-14236 Implemented consistent hash for Attribute Partitioner
- Used implementation from com.google.common.hash.Hashing.consistentHash method
  • Loading branch information
exceptionfactory authored Feb 10, 2025
1 parent 6dee8df commit 9b84dd4
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 3 deletions.
7 changes: 7 additions & 0 deletions nifi-assembly/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ This includes derived works from Dropwizard Metrics available under Apache Softw
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java

This product includes derived works from Google Guava (ASLv2 licensed)
Copyright 2011 The Guava Authors
The derived work is adapted from the class com.google.common.hash.Hashing:
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Hashing.java
The derived work can be found in:
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner

===========================================
Apache Software License v2
===========================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Copyright 2014-2024 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

This product includes derived works from Google Guava (ASLv2 licensed)
Copyright 2011 The Guava Authors
The derived work is adapted from the class com.google.common.hash.Hashing:
https://github.com/google/guava/blob/master/guava/src/com/google/common/hash/Hashing.java
The derived work can be found in:
org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner

******************
Apache Software License v2
******************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class CorrelationAttributePartitioner implements FlowFilePartitioner {
private static final int INDEX_OFFSET = 1;

// Multiplier from com.google.common.hash.Hashing.LinearCongruentialGenerator
private static final long LCG_MULTIPLIER = 2862933555777941757L;

private static final Logger logger = LoggerFactory.getLogger(CorrelationAttributePartitioner.class);

private final String partitioningAttribute;
Expand Down Expand Up @@ -69,7 +73,39 @@ public boolean isRebalanceOnFailure() {
}

private int findIndex(final long hash, final int partitions) {
final Random random = new Random(hash);
return random.nextInt(partitions);
// Method implementation based on Google Guava com.google.common.hash.Hashing.consistentHash()
final LinearCongruentialGenerator generator = new LinearCongruentialGenerator(hash);
int candidate = 0;

while (true) {
final double nextGenerated = generator.nextDouble();
final int nextCandidate = candidate + INDEX_OFFSET;
final int next = (int) (nextCandidate / nextGenerated);
if (next >= 0 && next < partitions) {
candidate = next;
} else {
final int index;
if (candidate == 0) {
index = candidate;
} else {
// Adjust index when handling more than one partition
index = candidate - INDEX_OFFSET;
}
return index;
}
}
}

private static final class LinearCongruentialGenerator {
private long state;

private LinearCongruentialGenerator(final long seed) {
this.state = seed;
}

private double nextDouble() {
state = LCG_MULTIPLIER * state + INDEX_OFFSET;
return ((double) ((int) (state >>> 33) + 1)) / 0x1.0p31;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.controller.queue.clustered.partition;

import org.apache.nifi.controller.repository.FlowFileRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class CorrelationAttributePartitionerTest {
private static final String PARTITIONING_ATTRIBUTE = "group";

private static final String FIRST_ATTRIBUTE = "1";

private static final String SECOND_ATTRIBUTE = "2";

@Mock
private FlowFileRecord flowFileRecord;

@Mock
private QueuePartition localPartition;

@Mock
private QueuePartition firstPartition;

@Mock
private QueuePartition secondPartition;

@Mock
private QueuePartition thirdPartition;

private CorrelationAttributePartitioner partitioner;

@BeforeEach
void setPartitioner() {
partitioner = new CorrelationAttributePartitioner(PARTITIONING_ATTRIBUTE);
}

@Test
void testRebalanceOnClusterResize() {
assertTrue(partitioner.isRebalanceOnClusterResize());
}

@Test
void testRebalanceOnFailure() {
assertFalse(partitioner.isRebalanceOnFailure());
}

@Test
void testGetPartitionOnePartitionNullAttribute() {
final QueuePartition[] partitions = new QueuePartition[]{firstPartition};

final QueuePartition partition = partitioner.getPartition(flowFileRecord, partitions, localPartition);

assertEquals(firstPartition, partition);
}

@Test
void testGetPartitionTwoPartitionsNullAttribute() {
final QueuePartition[] partitions = new QueuePartition[]{firstPartition, secondPartition};

final QueuePartition partition = partitioner.getPartition(flowFileRecord, partitions, localPartition);

assertEquals(firstPartition, partition);
}

@Test
void testGetPartitionThreePartitionsAttributeDefined() {
final QueuePartition[] partitions = new QueuePartition[]{firstPartition, secondPartition, thirdPartition};

// Set First Attribute for partitioning
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(FIRST_ATTRIBUTE);

final QueuePartition firstSelected = partitioner.getPartition(flowFileRecord, partitions, localPartition);
assertEquals(firstPartition, firstSelected);

// Set Second Attribute for partitioning
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(SECOND_ATTRIBUTE);

final QueuePartition secondSelected = partitioner.getPartition(flowFileRecord, partitions, localPartition);
assertEquals(secondPartition, secondSelected);

final QueuePartition thirdSelected = partitioner.getPartition(flowFileRecord, partitions, localPartition);
assertEquals(secondPartition, thirdSelected);

// Reset to First Attribute for partitioning
when(flowFileRecord.getAttribute(eq(PARTITIONING_ATTRIBUTE))).thenReturn(FIRST_ATTRIBUTE);

final QueuePartition fourthSelected = partitioner.getPartition(flowFileRecord, partitions, localPartition);
assertEquals(firstPartition, fourthSelected);
}
}

0 comments on commit 9b84dd4

Please # to comment.