Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

SOLVED: Jet RoutingPolicy.ISOLATED not working as supposed

Kleyson Rios:

I'm trying to use RoutingPolicy.ISOLATED to create dedicated connections between the upstream and downstream, according to this thread Creating a new Jet custom Partitioner

Also, trying to use DiagnosticProcessors.peekOutputP to validate that messages from the same partition were being sent to the same downstream processor.

Below the logs from the Jet initialization:


45:29,049 Loading hazelcast-jet-default.xml from classpath.
45:29,225 Loading hazelcast-jet-member-default.xml from classpath.
45:30,293 [172.21.0.1]:5701 [jet] [0.5.1] Starting Jet 0.5.1 (20171206 - a2156c6)
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] Setting number of cooperative threads and default parallelism to 8
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1]
o o o o---o o---o o o---o o o---o o-o-o o o---o o-o-o
| | / \ / | | | / \ | | | | |
o---o o---o o o-o | o o---o o---o | | o-o |
| | | | / | | | | | | | \ | | |
o o o o o---o o---o o---o o---o o o o---o o o--o o---o o
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
45:32,256 [172.21.0.1]:5701 [jet] [0.5.1]

Members {size:1, ver:1} [
Member [172.21.0.1]:5701 - 71c9778e-ce8d-474e-9c8d-08616a229328 this
]

45:32,441 [172.21.0.1]:5701 [jet] [0.5.1] Starting job 0300-0f31-8123-97aa based on join/submit request from client
45:32,481 [172.21.0.1]:5701 [jet] [0.5.1] Start executing job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2, status STARTING
dag
.vertex("kafkaSource").localParallelism(2)
.vertex("meta").localParallelism(2)
.vertex("sink").localParallelism(1)
.edge(between("kafkaSource", "meta").isolated())
.edge(between("meta", "sink").partitioned(?))

45:32,551 [172.21.0.1]:5701 [jet] [0.5.1] Execution plan for job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2 initialized
45:32,555 [172.21.0.1]:5701 [jet] [0.5.1] Start execution of job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2 from coordinator [172.21.0.1]:5701
45:32,758 The configuration 'compression.type' was supplied but isn't a known config.
45:32,758 The configuration 'compression.type' was supplied but isn't a known config.
45:32,957 [172.21.0.1]:5701 [jet] [0.5.1] Partition assignments changed, new partitions: [jet-stream-0]
45:32,957 [172.21.0.1]:5701 [jet] [0.5.1] Partition assignments changed, new partitions: [jet-stream-1]

My Kafka producer is routing messages with the same key to the same partition:


log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
key=src1 partition=1
key=src1 partition=1
key=src0 partition=0
key=src1 partition=1
key=src0 partition=0
key=src1 partition=1
key=src1 partition=1
key=src0 partition=0
key=src0 partition=0
key=src1 partition=1

Based on the logs, looks like that the kafkaSource vertex is outputing all the messages to the same processor:


54:39,991 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@6da59b20
54:39,993 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #0
54:40,842 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@69c2e820
54:40,844 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #1
54:41,843 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@489aca4c
54:41,845 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #2
54:42,838 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@2e0d6d1f
54:42,838 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #3
54:43,840 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@e1043d3
54:43,841 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #4
54:44,853 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4e6aabe6
54:44,854 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #5
54:45,842 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4ca9eb8a
54:45,842 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #6
54:46,847 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@10c630f7
54:46,849 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #7
54:47,848 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4639cdac
54:47,849 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #8
54:48,843 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@325499d7
54:48,844 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #9

Below my Jet code:


DAG dag = new DAG();
DecodeKafkaMessage decodeKafkaMessage = new DecodeKafkaMessage();

Vertex kafkaSource = dag.newVertex("kafkaSource", DiagnosticProcessors.peekOutputP(streamKafkaP(properties, decodeKafkaMessage, topic)))
.localParallelism(2);

Vertex meta = dag.newVertex("meta", mapP(LogLine::parse))
.localParallelism(2);

Vertex sink = dag.newVertex("sink", DiagnosticProcessors.writeLoggerP())
.localParallelism(1);


dag.edge(between(kafkaSource, meta)
.isolated())
.edge(between(meta, sink)
.allToOne());



Posted in S.E.F
via StackOverflow & StackExchange Atomic Web Robots
This Question have been answered
HERE


This post first appeared on Stack Solved, please read the originial post: here

Share the post

SOLVED: Jet RoutingPolicy.ISOLATED not working as supposed

×

Subscribe to Stack Solved

Get updates delivered right to your inbox!

Thank you for your subscription

×