Akka Typed JavaDSLでConcurrent Theta Sketchする
Akka Typed JavaDSLとDataSketchesをとりあえず使ってみるだけの備忘録。DataSketchesの中でもTheta Sketch Frameworkを使用する。今回やっていることは以下とほぼ同じ。
Concurrent Theta Sketchは自力で書かなくても UploadSketchBuilder#buildShared
とUploadSketchBuilder#buildLocal
が公式に提供されているので、実際にやる場合はこちらを使うべき。
まず、gradle経由でakka typedを使うには、最低限scalaプラグインを適用してやる必要がある。更に適当なAkka Typedの依存と、DataSketchesの依存を追加する。
plugins { id 'java' id 'scala' } dependencies { compile group: 'org.apache.datasketches', name: 'datasketches-java', version: '1.2.0-incubating' compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.13', version: '2.6.4' }
ローカルスケッチを更新するためのSketchActorを用意。単一のローカルスケッチをスレッドセーフに更新する。また、定期的にコンパクションしたローカルスケッチを、後述するSketchUnionActorに送信する。
import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.*; import org.apache.datasketches.theta.UpdateSketch; import java.time.Duration; public class SketchActor extends AbstractBehavior<SketchActor.Protocol> { public interface Protocol {} public static final class SketchEvent implements Protocol { private final long value; public SketchEvent(long value) { this.value = value; } public long getValue() { return value; } } private static final class PersistRequest implements Protocol { private PersistRequest() {} private final static PersistRequest instance = new PersistRequest(); public static PersistRequest getInstance() { return instance; } } private final UpdateSketch updateSketch; private final ActorRef<SketchUnionActor.UnionSketch> unionActorRef; private SketchActor( ActorContext<Protocol> context, UpdateSketch updateSketch, ActorRef<SketchUnionActor.UnionSketch> unionActorRef) { super(context); this.updateSketch = updateSketch; this.unionActorRef = unionActorRef; } public static Behavior<Protocol> create( ActorRef<SketchUnionActor.UnionSketch> unionActorRef, Duration unionInitialDuration, Duration unionRate) { return Behaviors.setup(ctx -> { ctx.getSystem().scheduler() .scheduleAtFixedRate( unionInitialDuration, unionRate, () -> ctx.getSelf().tell(PersistRequest.getInstance()), ctx.getSystem().executionContext()); return new SketchActor(ctx, UpdateSketch.builder().build(), unionActorRef); }); } @Override public Receive<Protocol> createReceive() { return newReceiveBuilder() .onMessage(SketchEvent.class, this::onSketchEvent) .onMessage(PersistRequest.class, this::onPersist) .build(); } private Behavior<Protocol> onSketchEvent(SketchEvent event) { updateSketch.update(event.getValue()); return Behaviors.same(); } private Behavior<Protocol> onPersist(PersistRequest persist) { unionActorRef.tell(new SketchUnionActor.UnionSketch(updateSketch.compact())); return Behaviors.same(); } }
SketchActorからコンパクションされたスケッチを受け取り、UnionSketchにマージしていく。定期的にスナップショットをファイルとして公開している。
import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.*; import org.apache.datasketches.theta.*; import java.io.FileOutputStream; import java.io.IOException; import java.time.Duration; public class SketchUnionActor extends AbstractBehavior<SketchUnionActor.Protocol> { public interface Protocol {} public static final class UnionSketch implements Protocol { private final Sketch sketch; public UnionSketch(Sketch sketch) { this.sketch = sketch; } public Sketch getSketch() { return sketch; } } private static final class SnapshotRequest implements Protocol { private SnapshotRequest() {} private final static SnapshotRequest instance = new SnapshotRequest(); public static SnapshotRequest getInstance() { return instance; } } private final Union union; private SketchUnionActor(ActorContext<Protocol> context) { super(context); union = SetOperation.builder().buildUnion(); } public static Behavior<Protocol> create(Duration snapshotInitialDuration, Duration snapshotRate) { return Behaviors.setup(ctx -> { ctx.getSystem().scheduler() .scheduleAtFixedRate( snapshotInitialDuration, snapshotRate, () -> ctx.getSelf().tell(SnapshotRequest.getInstance()), ctx.getSystem().executionContext()); return new SketchUnionActor(ctx); }); } @Override public Receive<Protocol> createReceive() { return newReceiveBuilder() .onMessage(UnionSketch.class, this::onUnionSketch) .onMessage(SnapshotRequest.class, this::onSnapshotRequest) .build(); } private Behavior<Protocol> onUnionSketch(UnionSketch unionSketch) { union.update(unionSketch.getSketch()); return Behaviors.same(); } private Behavior<Protocol> onSnapshotRequest(SnapshotRequest snapshotRequest) { final var compacted = union.getResult(); final var snapshot = compacted.toByteArray(); try (final var out = new FileOutputStream("snapshot.bin")) { System.out.println("snapshot start."); out.write(snapshot); System.out.println("snapshot end."); System.out.println(compacted.toString()); } catch (IOException e) { e.printStackTrace(); } return Behaviors.same(); } }
main関数。Actorを適当にセットアップして、適当に値を送りつける。
import java.time.Duration; import java.util.stream.IntStream; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; public final class Main { public static void main(String[] args) { final var guardian = Behaviors.setup(ctx -> { final var union = ctx.spawn(SketchUnionActor.create( Duration.ofSeconds(180), Duration.ofSeconds(180)), "union").<SketchUnionActor.UnionSketch>narrow(); final var sketch1 = ctx.spawn( SketchActor.create(union, Duration.ofSeconds(30), Duration.ofSeconds(30)), "sketch1"); final var sketch2 = ctx.spawn( SketchActor.create(union, Duration.ofSeconds(30), Duration.ofSeconds(30)), "sketch2"); IntStream.range(0, 100000).forEach(i -> sketch1.tell(new SketchActor.SketchEvent(i))); IntStream.range(50000, 150000).forEach(i -> sketch2.tell(new SketchActor.SketchEvent(i))); return new AbstractBehavior<Object>(ctx) { @Override public Receive<Object> createReceive() { return newReceiveBuilder().build(); } }; }); ActorSystem.create(guardian, "system"); } }
上のコードだと、30秒間間隔で各SketchActorがSketchUnionActorにコンパクションされたSketchを送り、180秒間間隔でSketchUnionActorがスナップショットを公開する。以下は実行後180秒ほど経過した時のstdout。
snapshot start. snapshot end. ### HeapCompactOrderedSketch SUMMARY: Estimate : 149586.73149344584 Upper Bound, 95% conf : 154287.5017892762 Lower Bound, 95% conf : 145028.6046846571 Theta (double) : 0.027382107751846067 Theta (long) : 252555366948521403 Theta (long) hex : 038141c4a515c5bb EstMode? : true Empty? : false Retained Entries : 4096 Seed Hash : 93cc | 37836 ### END SKETCH SUMMARY
Akka JavaDSL、案外ScalaDSLと差異は少なく、使いやすそう。