Apache Beam State API

(Last updated: API version 2.0.0)

Beam exposes a State API that lets DoFn classes read and write arbitrary state that persists across individual stream elements. The DoFn classes need to be of type DoFn<KV<Key, Value>, ?> and a separate state will be maintained for each distinct value of Key. For example here is a DoFn that maintains a separate counter for each distinct integer key in its input PCollection:

class CountingDoFn extends DoFn<KV<Integer, String>, String>
{
  private static final String STATE_ID = "mystate";

  @StateId(STATE_ID)
  private final StateSpec<ValueState<Integer>> stateCell
    = StateSpecs.value(VarIntCoder.of());

  @ProcessElement
  public void process(ProcessContext context,
                      @StateId(STATE_ID)
                      ValueState<Integer> state)
  {
    final Integer val = state.read();
    final int counter = (val == null) ? 1 : val;
    state.write(counter + 1);
    context.output(String.valueOf(counter)
      + ":" + context.element().getValue());
  }
}

Subscribe to All Posts - Wesley Tanaka