Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Issue #454 ColumnFamilyIterator #456

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package me.prettyprint.cassandra.service;

import java.util.Iterator;

import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.query.SliceQuery;
import me.prettyprint.hector.api.Serializer;

/**
* Iterate all the columns in all the rows in the specified column family. It uses
* KeyIterator to iterate the rows and ColumnSliceIterator to iterate columns in a row.
*
* @author Bharatendra Boddu
*
* @param <K> Row key type
* @param <N> Column name type
* @param <V> Column value type
*/
public class ColumnFamilyIterator<K, N, V> implements Iterator<HColumn<N, V>> {
private int maxColumnsCount;
private N startColumn;
private N finishColumn;
private long totalRows;
private long totalColumns;
private Iterator<K> rowsIterator;
private Iterator<HColumn<N, V>> colsIterator;
private SliceQuery<K, N, V> sliceQuery;

/**
* Constructor - Iterate over all the rows
*
* @param ks Keyspace
* @param cf Column family
* @param serializer Key type serializer
* @param query Slice query object
* @param maxRows Max number of rows to retrieve per range query
* @param maxCols Max number of columns to retrieve per slice query
*/
public ColumnFamilyIterator(final Keyspace ks, final String cf,
final Serializer<K> serializer,
SliceQuery<K, N, V> query,
final int maxRows, final int maxCols) {
this(ks, cf, serializer, null, null, query, maxRows, maxCols);
}

/**
* Constructor - Iterate over the rows specified between start and finish
*
* @param ks Keyspace
* @param cf Column family
* @param serializer Key type serializer
* @param start Start row key
* @param finish End row key
* @param query Slice query object
* @param maxRows Max number of rows to retrieve per range query
* @param maxCols Max number of columns to retrieve per slice query
*/
public ColumnFamilyIterator(final Keyspace ks, final String cf,
final Serializer<K> serializer,
final K start, final K finish,
SliceQuery<K, N, V> query,
final int maxRows, final int maxCols) {
this(new KeyIterator<K>(ks, cf, serializer, start, finish, maxRows), cf, query, maxCols);
}
/**
* Constructor - Iterate the rows using keyIterator
*
* @param keyIterator Key iterator
* @param cf Column family
* @param query Slice query object
* @param maxCols Max number of columns to retrieve per slice query
*/
public ColumnFamilyIterator(final KeyIterator<K> keyIterator, final String cf, SliceQuery<K, N, V> query, final int maxCols) {
this(keyIterator, cf, query, null, null, maxCols);
}

/**
* Constructor - Iterate the rows using keyIterator and in each row iterate over the column in the range
* [startColumn:finishColumn]
*
* @param keyIterator Key iterator
* @param cf Column family
* @param query Slice query object
* @param startColumn Start column name
* @param finishColumn End column name
* @param maxCols Max number of columns to retrieve per slice query
*/
public ColumnFamilyIterator(final KeyIterator<K> keyIterator, final String cf, SliceQuery<K, N, V> query,
final N startCol, final N finishCol, final int maxCols) {
maxColumnsCount = maxCols;
startColumn = startCol;
finishColumn = finishCol;
totalRows = 0;
totalColumns = 0;
rowsIterator = keyIterator.iterator();
colsIterator = null;
sliceQuery = query;
sliceQuery.setColumnFamily(cf);
}

@Override
public boolean hasNext() {
if (colsIterator == null || !colsIterator.hasNext()) {
if (rowsIterator.hasNext()) {
K nextRowKey = rowsIterator.next();
totalRows++;
sliceQuery.setKey(nextRowKey);
colsIterator = new ColumnSliceIterator(sliceQuery, startColumn, finishColumn, false, maxColumnsCount);
return hasNext();
}
}
return (colsIterator != null) ? colsIterator.hasNext() : false;
}

@Override
public HColumn<N, V> next() {
HColumn<N, V> column = colsIterator.next();
totalColumns++;
return column;
}

@Override
public void remove() {
}

public long getTotalRowsCount() {
return totalRows;
}

public long getTotalColumnsCount() {
return totalColumns;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package me.prettyprint.cassandra.service;

import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createKeyspace;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;
import static me.prettyprint.hector.api.factory.HFactory.getOrCreateCluster;
import static org.junit.Assert.assertEquals;
import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.beans.HColumn;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ColumnFamilyIteratorTest extends BaseEmbededServerSetupTest {

private static final StringSerializer se = new StringSerializer();
private static final IntegerSerializer is = IntegerSerializer.get();

private static final String CF = "Standard1";

private Cluster cluster;
private Keyspace keyspace;

@Before
public void setupCase() {
cluster = getOrCreateCluster("Test Cluster", "127.0.0.1:9170");
keyspace = createKeyspace("Keyspace1", cluster);
// Insert 10 rows
Mutator<String> m = createMutator(keyspace, se);
for (int i = 1; i <= 10; i++) {
// For each row insert 1000 columns
for (int j = 0; j < 1000; j++) {
m.addInsertion("k" + i, CF, createColumn(String.format("c%03d", j), new Integer(j), se, is));
}
}
m.execute();
}

@After
public void teardownCase() {
keyspace = null;
cluster = null;
}

@Test
public void testIterator() {
// Iterate over all the columns in all rows
ColumnFamilyIterator<String, String, Integer> it =
new ColumnFamilyIterator<String, String, Integer>(keyspace, CF, se,
HFactory.createSliceQuery(keyspace, se, se, is),
2, 100);
while(it.hasNext()) {
it.next();
}
assertEquals(it.getTotalRowsCount(), 10);
assertEquals(it.getTotalColumnsCount(), 10000);
}

@Test
public void testRangeSliceIterator() {
// Iterate over all the columns in rows in range [k2:k7]
String start = "k2";
String end = "k7";
KeyIterator<String> keyIterator = new KeyIterator<String>(keyspace, CF, se, start, end, 2);
ColumnFamilyIterator<String, String, Integer> it =
new ColumnFamilyIterator<String, String, Integer>(keyIterator, CF,
HFactory.createSliceQuery(keyspace, se, se, is),
100);
while(it.hasNext()) {
it.next();
}
assertEquals(it.getTotalRowsCount(), 6);
assertEquals(it.getTotalColumnsCount(), 6000);
}

@Test
public void testColumnSliceIterator() {
// Iterate over columns in the range [c100:c199] in rows in range [k2:k7]
String start = "k2";
String end = "k7";
KeyIterator<String> keyIterator = new KeyIterator<String>(keyspace, CF, se, start, end, 2);
ColumnFamilyIterator<String, String, Integer> it =
new ColumnFamilyIterator<String, String, Integer>(keyIterator, CF,
HFactory.createSliceQuery(keyspace, se, se, is),
"c100", "c199", 10);
while(it.hasNext()) {
it.next();
}
assertEquals(it.getTotalRowsCount(), 6);
assertEquals(it.getTotalColumnsCount(), 600);
}
}