Skip to content

CellTransformer Examples

bfemiano edited this page Sep 17, 2012 · 12 revisions

The basic Accumulo reader AccumuloDBResultReader is designed to handle most common scan operations, and is not a common extension point.

The CellTransformer you supply to the read() operation is where the magic happens. You can either supply your own through an anonymous inner declaration to the reader, supply a prepared class that implements CellTransformer, or use one of the prebuilt transformers prepared in AccumuloCellTransformers.

This piece of code defines how to take an incoming key/value pair and produce cells for a CellGroup.

Let's take a look at what this actually means.

Below is a basic example of an Accumulo CellTransformer that takes an incoming cell group and key/value pair, outputting a cell group.

public CellGroup<SecurityStringValueCell> apply(Map.Entry<Key, Value> dbItem,
                                                CellGroup<SecurityStringValueCell> cellGroup) {
        String activeRowId = dbItem.getKey().getRow().toString();
        if (!cellGroup.getTag().equals(activeRowId)) {
            cellGroup = new CellGroup<SecurityStringValueCell>(activeRowId);
        }
        String label = dbItem.getKey().getColumnQualifier().toString();
        byte [] valueBytes = dbItem.getValue().get();
        String value = valueBytes.length > 0 ? new String(valueBytes) : "";
        SecurityStringValueCell cell = new SecurityStringValueCell(label, value);
        cellGroup.addCell(cell);
        return cellGroup;
    }

The transformer instantiates a new cell group if the active key/value pair contains a different rowId than the incoming cell group. The default AccumuloReader will inject the same cell group reference until the transformer creates a new one. Whether or not to create a new cell group instance is left entirely up to the transformer. In this case, we wish to create a new group per incoming rowID in the scan.

If the value contents is not empty, then create a String from the byte content, otherwise set to an empty String. This String becomes the value for our new cell. Create a new SecurityStringValueCell with the label and value and add it to the group. Return the newly created group.

Subsequent calls to this apply method will receive the same cell group reference, that is until the key/value pair sees a new rowID. Since an Accumulo scans return key/value pairs in rowID sorted order, we can safely assume a given CellGroup for the previous rowID will contain all the relevant information by the time we see the next rowID in the scan.

Let's look at another example where we want slightly modified behavior during our scan. It's often the case you'll want to scan over a particular column family where the qualifiers contain the data of interest, and we can ignore the value.

The below transformer can be used to accomplish this. Should the column family in a given key/value pair appear in the supplied map, label the cell with the value in the map and set the cell value to the current qualifier. If the column family is not in the map, follow the same behavior from our earlier example. That being making the qualifier the label, and a String value from the byte contents. Simple.

The following code is from the static method colFamToCommonLabelOnMatches located in AccumuloCellTransformers

public static CellTransformer<Map.Entry<Key,Value>, SecurityStringValueCell> colFamToCommonLabelOnMatches(
            final Map<String, String> colFamToCommonLabel) {
        return new CellTransformer<Map.Entry<Key, Value>, SecurityStringValueCell>() {
            public CellGroup<SecurityStringValueCell> apply(Map.Entry<Key, Value> dbItem,
                                                            CellGroup<SecurityStringValueCell> cellGroup) throws CellExtractorException {
                String activeRowId = dbItem.getKey().getRow().toString();
                if (!cellGroup.getTag().equals(activeRowId)) {
                    cellGroup = new CellGroup<SecurityStringValueCell>(activeRowId);
                }
                String colFamStr = dbItem.getKey().getColumnFamily().toString();
                String label = dbItem.getKey().getColumnQualifier().toString();
                String value = new String(dbItem.getValue().get());
                if(colFamToCommonLabel.containsKey(colFamStr)){
                    value = label;
                    label = colFamToCommonLabel.get(colFamStr);
                }
                SecurityStringValueCell cell = new SecurityStringValueCell(label, value, colFamStr);
                cellGroup.addCell(cell);
                return cellGroup;
            }
        };
    }

This method takes a constant map of specific labels to apply for a particular column family. The transformer creates a new cell group as the rowID changes. The qualifier gets set for the label, and the value bytes as the String value. However, if colFamToCommonLabel.containsKey(colFamStr) evaulates to true, set the value to the current label String and reset the label to the map value for colFamStr. Either way persist the label, value, and active column family for the cell. Add it to the group and return the group.

You can also do aggregation over your results directly with CellTransformers.

Instead of using the default Accumulo reader, you can instantiate AccumuloAggregateDBResultReader. This class is very similar to the default Accumulo Reader, except for one critical difference. After the scan iteration, a final apply() call is made to the transformer passing a null key/value pair. Cell transformers looking to aggregate results can use the incoming null value as a flag that iteration is finished, and we have seen all the key/value pairs.

Here's an example that counts all the distinct rowIDs seen during the scan and outputs a single Cell group with a single Cell holding the sum.

This was taken from static method distinctRowIDCount() located in AccumuloCellTransformers

public static CellTransformer<Map.Entry<Key,Value>, IntValueCell> distinctRowIDCount()
    {
        return new CellTransformer<Map.Entry<Key, Value>, IntValueCell>() {
            private int totalCount = 0;
            private static final String LABEL = "rowIdCount";
            private String prevLabel = null;
            public CellGroup<IntValueCell> apply(Map.Entry<Key, Value> dbItem, CellGroup<IntValueCell> group)
                    throws CellExtractorException {
                if(dbItem != null) {
                    if(prevLabel != null && !prevLabel.equals(dbItem.getKey().getRow().toString())) {
                        totalCount++;
                    }
                    prevLabel = dbItem.getKey().getRow().toString();
                } else {
                    IntValueCell cell = new IntValueCell(LABEL, totalCount+1); //first transition will be ignored. offset by one to account for this.
                    group.addCell(cell);
                }
                return group;
            }
        };
    }

First we check to make sure the db item is not null, which would signify the end of iteration. If the item contains a non-null key/value pair, check the current rowID against prevLabel. If there is a mismatch and not the first item in the scan (hence the prevLabel != null check) then increment the total count. Set prevLabel to the current and continue the iteration. This works since Accumulo will return all the rows in rowID sorted order. A change in the active rowID means we've seen all the key/value pairs for the previous rowID. Once we see a null dbItem, we know the reader is done iterating values and we can write our aggregate count to a single IntValueCell. Return a cell group containing just this single cell.

Note: This transformer will only work with the AccumuloAggregateDBResultReader. The default Accumulo reader will never input a null dbItem, and thus the final result will never be output. The result will be an empty list of CellGroups returning from your aggregate scans, which is undesirable.