Creating a custom export connector that runs inside VoltDB

written by vdbdev on June 19, 2014 with no comments

Overview

  1. Implement a new export connector.
  2. Compile the connector and place its .class file in VoltDB’s classpath. ($VOLTDB/lib/extension).
  3. Configure the connector in the deployment file.
  4. Run the server.

Implement a new export connector

An export connector, known internally as an ExportClient, receives blocks of row data from a producer within the database. The export
connector must hand off those rows to the down stream export data sink. The blocks are roughly
2MB in size and do not align with transactions. A block is guaranteed to contain complete
rows — that is – no single SQL INSERT to an export table is split across blocks.

The handoff from the internal producer to the export client follows a simple pattern:

producer -> client.onBlockStart
foreach row in block:
    producer -> client.processRow
producer -> client.onBlockCompletion

The pattern listed above runs on the same thread. It is not necessary to synchronize accesses to the data structures used in client.onBlockStart, client.processRow, and client.onBlockCompletion unless they are used in other threads as well.

If the client fails at onBlockStart, processRow or onBlockCompletion, it must throw a RestartBlockException to prevent VoltDB from acking and dropping the export data from its durability control.

To repeat this important point in other words: if the ExportClient runs onBlockStart, processRow and onBlockCompletion without throwing, VoltDB assumes the data is remotely durable and that the VoltDB database no longer has durability responsibility for the export block.

The ExportClient must not return from onBlockCompletion until it meets its hand-off
requirements. See getExecutor() below for some further commentary on correct
thread handling.

A complete, but trivial, export client.

package org.voltdb.exportclient;

import java.io.IOException;
import java.util.Properties;
import org.voltcore.logging.VoltLogger;
import org.voltdb.export.AdvertisedDataSource;

import com.google_voltpatches.common.util.concurrent.ListeningExecutorService;

public class PrinterExportClient extends ExportClientBase
{
    private static final VoltLogger logger = new VoltLogger("ExportClient");

    public PrinterExportClient() {
        logger.info("Creating Printer export client.");
    }

    @Override
    public void configure(Properties config) throws Exception {
        // passed deployment.xml export <configuration/> properties.
    }

    @Override
    public ExportDecoderBase constructExportDecoder(AdvertisedDataSource source) {
        // AdvertisedDataSource represents a <partition, table, generation>
        // triple. Partitions and tables are probably self-explanatory.Generations
        // are created across server reboots or schema changes so that old resources
        // are advertised until fully drained. The ExportClient's primary role is
        // to serve as a factory, providing concrete ExportDecoder implementations
        // for AdvertisedDataSources.
        return new PrinterExportDecoder(source);
        }

        static class PrinterExportDecoder extends ExportDecoderBase
        {
        PrinterExportDecoder(AdvertisedDataSource source) {
            super(source);
        }

        @Override
        public void sourceNoLongerAdvertised(AdvertisedDataSource source) {
            // The AdvertiseDataSource is no longer available. If file descriptors
            // or threads were allocated to handle the source, free them here.
        }

        @Override
        public void onBlockStart() throws RestartBlockException {
            // Default implementation is to do nothing
            // Override to provide per-block start handling.
        }

        @Override
        public boolean processRow(int rowSize, byte[] rowData) throws RestartBlockException {
            // Process row is the workhorse that handles each row of export data.
            try {
                // Process one row from the current block: useful export work goes here.
                Object[] pojo = this.decodeRow(rowData);
                System.out.println(pojo[0]);
            } catch (IOException e) {
                // throw restart with `true` to reject the block and apply back pressure.
                throw new RestartBlockException(true);
            }
            return true;  // see ENG-5116
        }

        @Override
        public void onBlockCompletion() throws RestartBlockException {
            // Default implementation is to do nothing.
            // Override to provide per-block end handling.
        }

        @Override
        public ListeningExecutorService getExecutor() {
            // By default, will return same-thread-executor. A note about threading
            // in the export client. Processing for an advertised data source is
            // done on an internal thread pool. You MUST NOT block the threads in
            // that pool. For example, Snapshots, Export overflow and other durability
            // mechanisms must schedule work on that thread pool to make progress. If
            // you block the default thread, you block Snapshot and Export overflow.
            // So -- either be kind -- or provide your own executor service here.
            return super.getExecutor();
        }
    }
}

Compilation

You will need the voltdb-x.y.jar to compile the custom export connector.

Depending on your IDE or Java development environment, you may prefer other methods… but here’s my approach:

  1. Install the enterprise voltdb tarball (I untar to $HOME/vdb).
  2. mkdir /tmp/customexport
  3. cd $HOME/vdb/voltdb-ent-4.4
  4. javac -cp voltdb/voltdb-4.4.jar -d /tmp/customexport src/org/voltdb/exportclient/PrinterExportClient.java
  5. cd /tmp/customexport
  6. jar cvf $HOME/vdb/voltdb-ent-4.4/lib/extension/customexport.jar *

Configure the export connector in the deployment.xml file.

To configure the connector, designate its class in the deployment.xml file and pass it whatever key/value properties it requires.

<?xml version="1.0"?>
<deployment>
    <cluster hostcount="1" sitesperhost="2" kfactor="0" />
    <httpd enabled="true">
        <jsonapi enabled="true" />
    </httpd>
    <export enabled="true" target="custom" exportconnectorclass="org.voltdb.exportclient.PrinterExportClient">
        <configuration>
            <property name="property1">value1</property>
        </configuration>
    </export>
</deployment>

Running the custom export connector

Place the compiled .class file in VoltDB classpath. Start VoltDB. The server will instantiate the custom connector and the log in its ctor will logged in logs/volt.txt. 2013-09-03 17:33:15,365 INFO [main] ExportClient: Creating Printer export client.. When you run a stored procedure that writes to an export table, you will see the System.out.println() in processRow write
to stdout.

Running the custom export connector using VoltDB Enterprise Manager

  1. Place the compiled .jar file in lib/extensions.
  2. Start Enterprise Manager using VOLTDB_OPTS to set the export class:
    export VOLTDB_OPTS="-D__EXPORT_TO_TYPE__='org.voltdb.exportclient.PrinterExportClient'"
    
  3. Send a REST request to create/modify the deployment:
    curl -v --data '{"Deployment":{"name":"test",  "kfactor":"0",  "voltroot":"/tmp/test","export" : "true", "exporttype":"", "exportparameters" : { "param1":"val1", "param2":"val2"}'