Skip to content

Commit

Permalink
Remove calls to deprecated API
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilles Philippart committed Jun 9, 2022
1 parent 9edf72c commit b685609
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/jackdaw/admin.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
(def client-impl
{:alter-topics* (fn [this topics]
(d/future
@(.all (.alterConfigs ^AdminClient this topics))))
@(.all (.incrementalAlterConfigs ^AdminClient this topics))))
:create-topics* (fn [this topics]
(d/future
@(.all (.createTopics ^AdminClient this topics))))
Expand Down
2 changes: 1 addition & 1 deletion src/jackdaw/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
of datafied messages."
[^Consumer consumer timeout]
(some->> (if (int? timeout)
(.poll consumer ^long timeout)
(.poll consumer (Duration/ofMillis timeout))
(.poll consumer ^Duration timeout))
(map jd/datafy)))

Expand Down
6 changes: 6 additions & 0 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
[kstream predicate-fns]
(p/branch kstream predicate-fns))

(defn split
"Returns a list of KStreams, one for each of the `predicate-fns`
provided."
[kstream predicate-fns]
(p/split kstream predicate-fns))

(defn flat-map
"Creates a KStream that will consist of the concatenation of messages
returned by calling `key-value-mapper-fn` on each key/value pair in the
Expand Down
8 changes: 6 additions & 2 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
(:refer-clojure :exclude [count map reduce group-by merge filter peek])
(:require [jackdaw.streams.protocols :refer [IGlobalKTable IKGroupedBase IKGroupedStream IKGroupedTable IKStream
IKStreamBase IKTable ISessionWindowedKStream IStreamsBuilder
ITimeWindowedKStream aggregate branch count filter filter-not flat-map
ITimeWindowedKStream aggregate branch split count filter filter-not flat-map
flat-map-values for-each! global-ktable global-ktable* group-by
group-by-key join join-global join-windowed kgroupedtable* kstream
kstream* kstreams ktable ktable* left-join left-join-global
Expand Down Expand Up @@ -153,9 +153,13 @@

IKStream
(branch
[this predicate-fns]
(split this predicate-fns))

(split
[_ predicate-fns]
(mapv (partial configured-kstream config)
(branch kstream predicate-fns)))
(split kstream predicate-fns)))

(flat-map
[_ key-value-mapper-fn]
Expand Down
2 changes: 1 addition & 1 deletion src/jackdaw/streams/describe.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
(base-node :node n))

(defmethod describe-node :source [n]
(let [topics (map str/trim (-> (.topics ^TopologyDescription$Source n)
(let [topics (map str/trim (-> (.topicSet ^TopologyDescription$Source n)
(str/replace "[" "")
(str/replace "]" "")
(str/split #",")))]
Expand Down
37 changes: 22 additions & 15 deletions src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
[org.apache.kafka.streams
StreamsBuilder]
[org.apache.kafka.streams.kstream
Aggregator Consumed GlobalKTable Grouped Initializer Joined
Aggregator Consumed GlobalKTable Grouped Initializer Joined StreamJoined
JoinWindows KGroupedStream KGroupedTable KStream KTable
KeyValueMapper Materialized Merger Predicate Printed Produced
Reducer SessionWindowedKStream SessionWindows
Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner
ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier]
ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier Repartitioned]
[org.apache.kafka.streams.state Stores]
(org.apache.kafka.streams.processor.api
ProcessorSupplier)))
Expand Down Expand Up @@ -138,7 +138,7 @@
key-serde
value-serde))
builder)

(streams-builder*
[_]
streams-builder))
Expand Down Expand Up @@ -192,12 +192,13 @@
(group-by
[_ key-value-mapper-fn]
(clj-kgroupedstream
(.groupBy kstream ^KeyValueMapper (select-key-value-mapper key-value-mapper-fn))))
(.groupBy ^KStream kstream
^KeyValueMapper (select-key-value-mapper key-value-mapper-fn))))

(group-by
[_ key-value-mapper-fn topic-config]
(clj-kgroupedstream
(.groupBy kstream
(.groupBy ^KStream kstream
^KeyValueMapper (select-key-value-mapper key-value-mapper-fn)
^Grouped (topic->grouped topic-config))))

Expand All @@ -211,7 +212,7 @@
[_ predicate-fns]
(mapv clj-kstream
(->> (into-array Predicate (mapv predicate predicate-fns))
(.branch kstream))))
(.split kstream))))

(flat-map
[_ key-value-mapper-fn]
Expand All @@ -229,13 +230,19 @@
nil)

(through
[_ {:keys [topic-name] :as topic-config}]
[_ {:keys [topic-name key-serde value-serde partition-fn] :as topic-config}]
(clj-kstream
(.through kstream topic-name ^Produced (topic->produced topic-config))))
(.repartition ^KStream kstream
(cond-> (Repartitioned/with key-serde value-serde)
(some? topic-name) (.withName topic-name)
(some? partition-fn) (.withStreamPartitioner
(->FnStreamPartitioner partition-fn))))))

(to!
[_ {:keys [topic-name] :as topic-config}]
(.to kstream ^String topic-name ^Produced (topic->produced topic-config))
(.to ^KStream kstream
^String topic-name
^Produced (topic->produced topic-config))
nil)

(flat-map-values
Expand Down Expand Up @@ -267,11 +274,11 @@
{key-serde :key-serde this-value-serde :value-serde}
{other-value-serde :value-serde}]
(clj-kstream
(.join kstream
(.join ^KStream kstream
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde this-value-serde other-value-serde))))
(StreamJoined/with key-serde this-value-serde other-value-serde))))

(left-join-windowed
[_ other-kstream value-joiner-fn windows]
Expand All @@ -286,11 +293,11 @@
{:keys [key-serde value-serde]}
{other-value-serde :value-serde}]
(clj-kstream
(.leftJoin kstream
(.leftJoin ^KStream kstream
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde))))
(StreamJoined/with key-serde value-serde other-value-serde))))

(map
[_ key-value-mapper-fn]
Expand Down Expand Up @@ -320,7 +327,7 @@
^KStream (kstream* other-kstream)
^ValueJoiner (value-joiner value-joiner-fn)
^JoinWindows windows
(Joined/with key-serde value-serde other-value-serde))))
(StreamJoined/with key-serde value-serde other-value-serde))))

(process!
[_ processor-supplier-fn state-store-names]
Expand Down Expand Up @@ -445,7 +452,7 @@
(group-by
[_ key-value-mapper-fn topic-config]
(clj-kgroupedtable
(.groupBy ktable
(.groupBy ^KTable ktable
^KeyValueMapper (key-value-mapper key-value-mapper-fn)
^Grouped (topic->grouped topic-config))))

Expand Down
7 changes: 7 additions & 0 deletions src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,14 @@

(defprotocol IKStream
"A KStream is an abstraction of a stream of key-value pairs."

(branch
[kstream predicate-fns]
"Returns a list of KStreams, one for each of the `predicate-fns`
provided.
Deprecated, use split instead.")

(split
[kstream predicate-fns]
"Returns a list of KStreams, one for each of the `predicate-fns`
provided.")
Expand Down
7 changes: 5 additions & 2 deletions src/jackdaw/test/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

(defn- new-topic
[t]
(doto (NewTopic. (:topic-name t)
(doto (NewTopic. ^String (:topic-name t)
(int (:partition-count t))
(short (:replication-factor t)))
(.configs (:config t))))
Expand Down Expand Up @@ -204,7 +204,10 @@
(fn [t]
(if-not (class-exists? 'kafka.tools.StreamsResetter)
(throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool"))
(let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter"))
(let [rt (-> "kafka.tools.StreamsResetter"
(clojure.lang.RT/classForName)
(.getDeclaredConstructor (into-array Class []))
(.newInstance (into-array [])))
args (concat ["--application-id" (get app-config "application.id")
"--bootstrap-servers" (get app-config "bootstrap.servers")]
reset-args)
Expand Down
8 changes: 4 additions & 4 deletions src/jackdaw/test/transports/kafka.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
serde-map
byte-array-serde]])
(:import
java.time.Duration
org.apache.kafka.common.header.Header
org.apache.kafka.clients.consumer.Consumer
org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -30,7 +31,7 @@

(defn load-assignments
[consumer]
(.poll ^Consumer consumer 0)
(.poll ^Consumer consumer (Duration/ofMillis 0))
(.assignment ^Consumer consumer))

(defn seek-to-end
Expand All @@ -50,7 +51,7 @@
[messages]
(fn [consumer]
(try
(let [m (.poll ^Consumer consumer 1000)]
(let [m (.poll ^Consumer consumer (Duration/ofMillis 1000))]
(when m
(s/put-all! messages m)))
(catch Throwable e
Expand All @@ -70,8 +71,7 @@
"Clojurize the ConsumerRecord returned from consuming a kafka record"
[^ConsumerRecord consumer-record]
(when consumer-record
{:checksum (.checksum consumer-record)
:key (.key consumer-record)
{:key (.key consumer-record)
:offset (.offset consumer-record)
:partition (.partition consumer-record)
:serializedKeySize (.serializedKeySize consumer-record)
Expand Down
32 changes: 22 additions & 10 deletions src/jackdaw/test/transports/mock.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
[clojure.stacktrace :as stacktrace]
[clojure.tools.logging :as log]
[jackdaw.test.journal :as j]
[jackdaw.serdes.fn :as jfn]
[jackdaw.streams.mock :as smock]
[jackdaw.test.transports :as t :refer [deftransport]]
[jackdaw.test.serde :refer [byte-array-deserializer
[jackdaw.test.serde :refer [byte-array-serde
apply-serializers apply-deserializers serde-map]]
[manifold.stream :as s]
[manifold.deferred :as d])
(:import
(org.apache.kafka.common.record TimestampType)
(org.apache.kafka.clients.consumer ConsumerRecord)))
(org.apache.kafka.common.record TimestampType)
(org.apache.kafka.clients.consumer ConsumerRecord)
(org.apache.kafka.clients.producer ProducerRecord)))

(set! *warn-on-reflection* false)

Expand Down Expand Up @@ -80,12 +83,12 @@
(fn [driver]
(let [fetch (fn [[k t]]
{:topic k
:output (loop [collected []]
(if-let [o (.readOutput driver (:topic-name t)
byte-array-deserializer
byte-array-deserializer)]
(recur (conj collected o))
collected))})
:output (let [topic-name (:topic-name t)]
{:topic k
:output (loop [collected []]
(if-let [{:keys [key value]} (smock/consume driver (assoc byte-array-serde :topic-name topic-name))]
(recur (conj collected (ProducerRecord. topic-name key value)))
collected))})})
topic-batches (->> topic-config
(map fetch)
(remove #(empty? (:output %)))
Expand Down Expand Up @@ -160,13 +163,22 @@
{:messages messages
:process process}))

(def identity-serializer (jfn/new-serializer {:serialize (fn [_ _ data] data)}))

(deftransport :mock
[{:keys [driver topics]}]
(let [serdes (serde-map topics)
test-consumer (mock-consumer driver topics (get serdes :deserializers))
record-fn (fn [input-record]
(try
(.pipeInput driver input-record)
(-> driver
(.createInputTopic
(.topic input-record)
identity-serializer ;; already serialized in mock-producer
identity-serializer)
(.pipeInput
(.key input-record)
(.value input-record)))
(catch Exception e
(let [trace (with-out-str
(stacktrace/print-cause-trace e))]
Expand Down
2 changes: 1 addition & 1 deletion test/jackdaw/serdes/avro_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
:namespace "com.fundingcircle"})
schema-type (schema-type avro-schema)
clj-data 4
avro-data (Integer. 4)]
avro-data (Integer/valueOf 4)]
(is (= clj-data (avro/avro->clj schema-type avro-data)))
(is (= avro-data (avro/clj->avro schema-type clj-data [])))

Expand Down
2 changes: 1 addition & 1 deletion test/jackdaw/serdes/edn2_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
(defspec edn-print-length-test 20
(testing "EDN data is the same after serialization and deserialization with *print-length*."
(binding [*print-length* 100]
(prop/for-all [x (gen/vector gen/int (inc *print-length*))]
(prop/for-all [x (gen/vector gen/small-integer (inc *print-length*))]
(is (= x (->> (.serialize (jse/edn-serializer) nil x)
(.deserialize (jse/edn-deserializer) nil))))))))

Expand Down
2 changes: 1 addition & 1 deletion test/jackdaw/serdes/edn_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
(defspec edn-print-length-test 20
(testing "EDN data is the same after serialization and deserialization with *print-length*."
(binding [*print-length* 100]
(prop/for-all [x (gen/vector gen/int (inc *print-length*))]
(prop/for-all [x (gen/vector gen/small-integer (inc *print-length*))]
(is (= x (->> (.serialize (jse/serializer) nil x)
(.deserialize (jse/deserializer) nil))))))))

Expand Down
12 changes: 6 additions & 6 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")
windows (JoinWindows/of 1000)
windows (JoinWindows/of (Duration/ofMillis 1000))
driver (mock/build-driver (fn [builder]
(let [left-kstream (k/kstream builder topic-a)
right-kstream (k/kstream builder topic-b)]
Expand Down Expand Up @@ -368,7 +368,7 @@
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
topic-c (mock/topic "topic-c")
windows (JoinWindows/of 1000)
windows (JoinWindows/of (Duration/ofMillis 1000))
driver (mock/build-driver (fn [builder]
(let [left-kstream (k/kstream builder topic-a)
right-kstream (k/kstream builder topic-b)]
Expand Down Expand Up @@ -1060,7 +1060,7 @@
(-> builder
(k/kstream topic-a)
(k/group-by (fn [[k _v]] (long (/ k 10))) topic-a)
(k/window-by-time (TimeWindows/of 1000))
(k/window-by-time (TimeWindows/of (Duration/ofMillis 1000)))
(k/reduce + topic-a)
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
Expand All @@ -1084,7 +1084,7 @@
(-> builder
(k/kstream topic-a)
(k/group-by-key)
(k/window-by-time (TimeWindows/of 1000))
(k/window-by-time (TimeWindows/of (Duration/ofMillis 1000)))
(k/reduce + topic-a)
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
Expand All @@ -1108,7 +1108,7 @@
(-> builder
(k/kstream topic-a)
(k/group-by (fn [[k _v]] (long (/ k 10))) topic-a)
(k/window-by-session (SessionWindows/with 1000))
(k/window-by-session (SessionWindows/with (Duration/ofMillis 1000)))
(k/reduce + topic-a)
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
Expand All @@ -1135,7 +1135,7 @@
(-> builder
(k/kstream topic-a)
(k/group-by-key)
(k/window-by-session (SessionWindows/with 1000))
(k/window-by-session (SessionWindows/with (Duration/ofMillis 1000)))
(k/aggregate (constantly 0)
(fn [agg [_k v]]
(+ agg v))
Expand Down

0 comments on commit b685609

Please sign in to comment.