Akka Typed JavaDSLでConcurrent Theta Sketchする

Akka Typed JavaDSLとDataSketchesをとりあえず使ってみるだけの備忘録。DataSketchesの中でもTheta Sketch Frameworkを使用する。今回やっていることは以下とほぼ同じ。

Concurrent Theta Sketchは自力で書かなくても UploadSketchBuilder#buildSharedUploadSketchBuilder#buildLocalが公式に提供されているので、実際にやる場合はこちらを使うべき。

まず、gradle経由でakka typedを使うには、最低限scalaプラグインを適用してやる必要がある。更に適当なAkka Typedの依存と、DataSketchesの依存を追加する。

plugins {
    id 'java'
    id 'scala'
}
dependencies {
    compile group: 'org.apache.datasketches', name: 'datasketches-java', version: '1.2.0-incubating'
    compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.13', version: '2.6.4'
}

ローカルスケッチを更新するためのSketchActorを用意。単一のローカルスケッチをスレッドセーフに更新する。また、定期的にコンパクションしたローカルスケッチを、後述するSketchUnionActorに送信する。

import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
import org.apache.datasketches.theta.UpdateSketch;

import java.time.Duration;

public class SketchActor extends AbstractBehavior<SketchActor.Protocol> {

    public interface Protocol {}

    public static final class SketchEvent implements Protocol {
        private final long value;
        public SketchEvent(long value) {
            this.value = value;
        }
        public long getValue() {
            return value;
        }
    }

    private static final class PersistRequest implements Protocol {
        private PersistRequest() {}
        private final static PersistRequest instance = new PersistRequest();
        public static PersistRequest getInstance() {
            return instance;
        }
    }

    private final UpdateSketch updateSketch;
    private final ActorRef<SketchUnionActor.UnionSketch> unionActorRef;

    private SketchActor(
            ActorContext<Protocol> context,
            UpdateSketch updateSketch,
            ActorRef<SketchUnionActor.UnionSketch> unionActorRef) {
        super(context);
        this.updateSketch = updateSketch;
        this.unionActorRef = unionActorRef;
    }

    public static Behavior<Protocol> create(
            ActorRef<SketchUnionActor.UnionSketch> unionActorRef,
            Duration unionInitialDuration, Duration unionRate) {
        return Behaviors.setup(ctx -> {
            ctx.getSystem().scheduler()
                    .scheduleAtFixedRate(
                            unionInitialDuration, unionRate,
                            () -> ctx.getSelf().tell(PersistRequest.getInstance()),
                            ctx.getSystem().executionContext());

            return new SketchActor(ctx, UpdateSketch.builder().build(), unionActorRef);
        });
    }

    @Override
    public Receive<Protocol> createReceive() {
        return newReceiveBuilder()
                .onMessage(SketchEvent.class, this::onSketchEvent)
                .onMessage(PersistRequest.class, this::onPersist)
                .build();
    }

    private Behavior<Protocol> onSketchEvent(SketchEvent event) {
        updateSketch.update(event.getValue());
        return Behaviors.same();
    }

    private Behavior<Protocol> onPersist(PersistRequest persist) {
        unionActorRef.tell(new SketchUnionActor.UnionSketch(updateSketch.compact()));
        return Behaviors.same();
    }

}

SketchActorからコンパクションされたスケッチを受け取り、UnionSketchにマージしていく。定期的にスナップショットをファイルとして公開している。

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
import org.apache.datasketches.theta.*;

import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Duration;

public class SketchUnionActor extends AbstractBehavior<SketchUnionActor.Protocol> {

    public interface Protocol {}

    public static final class UnionSketch implements Protocol {
        private final Sketch sketch;
        public UnionSketch(Sketch sketch) {
            this.sketch = sketch;
        }
        public Sketch getSketch() {
            return sketch;
        }
    }

    private static final class SnapshotRequest implements Protocol {
        private SnapshotRequest() {}
        private final static SnapshotRequest instance = new SnapshotRequest();
        public static SnapshotRequest getInstance() {
            return instance;
        }
    }

    private final Union union;

    private SketchUnionActor(ActorContext<Protocol> context) {
        super(context);
        union = SetOperation.builder().buildUnion();
    }

    public static Behavior<Protocol> create(Duration snapshotInitialDuration, Duration snapshotRate) {
        return Behaviors.setup(ctx -> {
            ctx.getSystem().scheduler()
                    .scheduleAtFixedRate(
                            snapshotInitialDuration, snapshotRate,
                            () -> ctx.getSelf().tell(SnapshotRequest.getInstance()),
                            ctx.getSystem().executionContext());

            return new SketchUnionActor(ctx);
        });
    }

    @Override
    public Receive<Protocol> createReceive() {
        return newReceiveBuilder()
                .onMessage(UnionSketch.class, this::onUnionSketch)
                .onMessage(SnapshotRequest.class, this::onSnapshotRequest)
                .build();
    }

    private Behavior<Protocol> onUnionSketch(UnionSketch unionSketch) {
        union.update(unionSketch.getSketch());
        return Behaviors.same();
    }

    private Behavior<Protocol> onSnapshotRequest(SnapshotRequest snapshotRequest) {
        final var compacted = union.getResult();
        final var snapshot = compacted.toByteArray();

        try (final var out = new FileOutputStream("snapshot.bin")) {
            System.out.println("snapshot start.");
            out.write(snapshot);
            System.out.println("snapshot end.");
            System.out.println(compacted.toString());
        } catch (IOException e) {
            e.printStackTrace();
        }

        return Behaviors.same();
    }

}

main関数。Actorを適当にセットアップして、適当に値を送りつける。

import java.time.Duration;
import java.util.stream.IntStream;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;

public final class Main {

    public static void main(String[] args) {
        final var guardian = Behaviors.setup(ctx -> {
            final var union = ctx.spawn(SketchUnionActor.create(
                    Duration.ofSeconds(180), Duration.ofSeconds(180)),
                    "union").<SketchUnionActor.UnionSketch>narrow();
            final var sketch1 = ctx.spawn(
                    SketchActor.create(union, Duration.ofSeconds(30), Duration.ofSeconds(30)), "sketch1");
            final var sketch2 = ctx.spawn(
                    SketchActor.create(union, Duration.ofSeconds(30), Duration.ofSeconds(30)), "sketch2");

            IntStream.range(0, 100000).forEach(i -> sketch1.tell(new SketchActor.SketchEvent(i)));
            IntStream.range(50000, 150000).forEach(i -> sketch2.tell(new SketchActor.SketchEvent(i)));

            return new AbstractBehavior<Object>(ctx) {
                @Override
                public Receive<Object> createReceive() {
                    return newReceiveBuilder().build();
                }
            };
        });

        ActorSystem.create(guardian, "system");
    }

}

上のコードだと、30秒間間隔で各SketchActorがSketchUnionActorにコンパクションされたSketchを送り、180秒間間隔でSketchUnionActorがスナップショットを公開する。以下は実行後180秒ほど経過した時のstdout。

snapshot start.
snapshot end.

### HeapCompactOrderedSketch SUMMARY: 
   Estimate                : 149586.73149344584
   Upper Bound, 95% conf   : 154287.5017892762
   Lower Bound, 95% conf   : 145028.6046846571
   Theta (double)          : 0.027382107751846067
   Theta (long)            : 252555366948521403
   Theta (long) hex        : 038141c4a515c5bb
   EstMode?                : true
   Empty?                  : false
   Retained Entries        : 4096
   Seed Hash               : 93cc | 37836
### END SKETCH SUMMARY

Akka JavaDSL、案外ScalaDSLと差異は少なく、使いやすそう。