Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocations improvements #69

Merged
merged 14 commits into from
Sep 10, 2023
3 changes: 3 additions & 0 deletions .github/workflows/ci_master.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: "Push CI - master"

permissions:
contents: write

on:
push:
branches:
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### [3.1.0] - 2023-08-22

#### Added

* Add the `aerospike-clj.collections/->list` function, which is similar to `clojure.core/mapv`, but it's more efficient
when the input is not a Clojure sequence.

#### Changed

* Make the `aerospike-clj.utils/v->array` multi-arity, allowing to pass a `mapper-fn` to map the values before setting
them into the array.
* Optimize the `aerospike-clj.utils/v->array` function by calling `java.util.Collection#toArray` with a 0-length array,
this will force the implementation to use the more performant `java.util.Arrays.copyOf`.

#### Deprecated

* Deprecate `aerospike-clj.utils/string-keys?`.

## [3.0.0] - 2023-08-03

### Changed
Expand Down Expand Up @@ -201,6 +219,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

[A complete list of all java client related changes](https://www.aerospike.com/download/client/java/notes.html)

[3.1.0]: https://github.com/AppsFlyer/aerospike-clj/pull/68

[3.0.0]: https://github.com/AppsFlyer/aerospike-clj/pull/62

[2.0.7]: https://github.com/AppsFlyer/aerospike-clj/pull/64
Expand Down
5 changes: 3 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/aerospike-clj "3.0.0-SNAPSHOT"
(defproject com.appsflyer/aerospike-clj "3.1.0-SNAPSHOT"
:description "An Aerospike Clojure client."
:url "https://github.com/AppsFlyer/aerospike-clj"
:license {:name "Eclipse Public License"
Expand All @@ -23,7 +23,8 @@
[cheshire "5.11.0"]
[tortue/spy "2.14.0"]
[com.fasterxml.jackson.core/jackson-databind "2.11.2"]
[clj-kondo "2022.04.25"]]
[clj-kondo "2022.04.25"]
[com.clojure-goes-fast/clj-java-decompiler "0.3.4"]]
evg-tso marked this conversation as resolved.
Show resolved Hide resolved
:eftest {:multithread? false
:report eftest.report.junit/report
:report-to-file "target/junit.xml"}
Expand Down
36 changes: 21 additions & 15 deletions src/main/clojure/aerospike_clj/bins.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns aerospike-clj.bins
(:require [aerospike-clj.utils :as utils])
(:import [com.aerospike.client Bin]
[clojure.lang IPersistentMap]))
(:import (clojure.lang IPersistentMap)
(com.aerospike.client Bin)))

(set! *warn-on-reflection* true)

(def ^:private ^:const MAX_BIN_NAME_LENGTH 14)

Expand All @@ -15,22 +17,26 @@
(throw (Exception. (format "%s is %s characters. Bin names have to be <= %d characters..." bin-name (.length bin-name) MAX_BIN_NAME_LENGTH))))
(Bin/asNull bin-name))

(def ^:private x-bin-convert
(comp
(map (fn [[k v]] [k (utils/sanitize-bin-value v)]))
(map (fn [[k v]] (create-bin k v)))))

(defn- map->multiple-bins [^IPersistentMap m]
(let [bin-names (keys m)]
(if (utils/string-keys? bin-names)
(->> (into [] x-bin-convert m)
(utils/v->array Bin))
(throw (Exception. (format "Aerospike only accepts string values as bin names. Please ensure all keys in the map are strings."))))))
(defn- map->multiple-bins ^"[Lcom.aerospike.client.Bin;" [^IPersistentMap m]
(let [size (.count m)
iterator (.iterator m)
res ^"[Lcom.aerospike.client.Bin;" (make-array Bin size)]
(loop [i 0]
(when (and (< i size)
(.hasNext iterator))
(let [entry (.next iterator)
key-entry (key entry)]
(when-not (string? key-entry)
(throw (Exception. (format "Aerospike only accepts string values as bin names. Please ensure all keys in the map are strings."))))
(aset res i (create-bin key-entry (utils/sanitize-bin-value (val entry))))
(recur (inc i)))))
res))

(defn data->bins
"Function to identify whether `data` will be stored as a single or multiple bin record.
Only Clojure maps will default to multiple bins. Nested data structures are supported."
[data]
^"[Lcom.aerospike.client.Bin;" [data]
(if (map? data)
(map->multiple-bins data)
(utils/v->array Bin [^Bin (Bin. "" (utils/sanitize-bin-value data))])))
(doto ^"[Lcom.aerospike.client.Bin;" (make-array Bin 1)
(aset 0 (Bin. "" (utils/sanitize-bin-value data))))))
61 changes: 30 additions & 31 deletions src/main/clojure/aerospike_clj/client.clj
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
(ns aerospike-clj.client
(:refer-clojure :exclude [update])
(:require [clojure.string :as s]
[clojure.tools.logging :as log]
[promesa.core :as p]
[promesa.exec :as p-exec]
[aerospike-clj.policy :as policy]
(:require [aerospike-clj.aerospike-record :as record]
[aerospike-clj.bins :as bins]
[aerospike-clj.utils :as utils]
[aerospike-clj.metrics :as metrics]
[aerospike-clj.collections :as collections]
[aerospike-clj.key :as as-key]
[aerospike-clj.listeners]
[aerospike-clj.aerospike-record :as record]
[aerospike-clj.protocols :as pt])
(:import (java.time Instant)
(java.util List Collection ArrayList Arrays)
(com.aerospike.client AerospikeClient Key Bin Operation BatchRead)
(com.aerospike.client.async EventLoop NioEventLoops EventLoops)
[aerospike-clj.metrics :as metrics]
[aerospike-clj.policy :as policy]
[aerospike-clj.protocols :as pt]
[aerospike-clj.utils :as utils]
[clojure.string :as s]
[clojure.tools.logging :as log]
[promesa.core :as p]
[promesa.exec :as p-exec])
(:import (aerospike_clj.listeners AsyncBatchListListener AsyncBatchOperateListListener AsyncDeleteListener
AsyncExistsArrayListener AsyncExistsListener AsyncInfoListener
AsyncRecordListener AsyncRecordSequenceListener AsyncWriteListener)
(com.aerospike.client BatchRecord Host Key)
(com.aerospike.client AerospikeClient BatchRead Bin Key Operation)
(com.aerospike.client.async EventLoop EventLoops NioEventLoops)
(com.aerospike.client.cluster Node)
(com.aerospike.client.policy Policy BatchPolicy ClientPolicy
RecordExistsAction WritePolicy ScanPolicy
InfoPolicy)
(com.aerospike.client Key Host BatchRecord)
(aerospike_clj.listeners AsyncExistsListener AsyncDeleteListener AsyncWriteListener
AsyncInfoListener AsyncRecordListener AsyncRecordSequenceListener
AsyncBatchListListener AsyncExistsArrayListener AsyncBatchOperateListListener)
(com.aerospike.client.listener BatchOperateListListener)
(com.aerospike.client.policy BatchPolicy ClientPolicy InfoPolicy
Policy RecordExistsAction ScanPolicy
WritePolicy)
(java.time Instant)
(java.util Arrays List)
(java.util.concurrent Executor)))

(def
Expand Down Expand Up @@ -103,7 +104,7 @@
(AsyncWriteListener. op-future)
^WritePolicy policy
^Key (pt/create-key index dbns set-name)
^"[Lcom.aerospike.client.Bin;" bins)
bins)
(register-events op-future client-events :write index start-time conf)))

(deftype SimpleAerospikeClient [client
Expand Down Expand Up @@ -175,14 +176,14 @@
(get-batch [_this batch-reads conf]
(let [op-future (p/deferred)
start-time (System/nanoTime)
batch-reads-arr (ArrayList. ^Collection (mapv #(map->batch-read % dbns) batch-reads))]
batch-reads-arr (collections/->list #(map->batch-read % dbns) batch-reads)]
(.get ^AerospikeClient client
^EventLoop (.next ^EventLoops el)
(AsyncBatchListListener. op-future)
^BatchPolicy (:policy conf)
^List batch-reads-arr)
batch-reads-arr)
(-> op-future
(p/then' #(mapv batch-record->map %) completion-executor)
(p/then' #(collections/->list batch-record->map %) completion-executor)
(p/then' (:transcoder conf identity))
(register-events client-events :read-batch nil start-time conf))))

Expand All @@ -193,7 +194,7 @@
(let [op-future (p/deferred)
start-time (System/nanoTime)
transcoder (:transcoder conf identity)
indices (utils/v->array Key (mapv #(pt/create-key (:index %) dbns (:set %)) indices))]
indices (utils/v->array Key #(pt/create-key (:index %) dbns (:set %)) indices)]
(.exists ^AerospikeClient client
^EventLoop (.next ^EventLoops el)
(AsyncExistsArrayListener. op-future)
Expand Down Expand Up @@ -342,7 +343,7 @@
(AsyncWriteListener. op-future)
^WritePolicy policy
^Key (pt/create-key index dbns set-name)
^"[Lcom.aerospike.client.Bin;" (utils/v->array Bin (mapv bins/set-bin-as-null bin-names)))
^"[Lcom.aerospike.client.Bin;" (utils/v->array Bin bins/set-bin-as-null bin-names))
(-> op-future
(p/then' identity completion-executor)
(register-events client-events :write index start-time conf))))
Expand Down Expand Up @@ -375,9 +376,7 @@
policy (:policy conf)
batch-list (if (list? batch-records)
batch-records
(->> batch-records
(utils/v->array BatchRecord)
(Arrays/asList)))
(into [] batch-records))
start-time (System/nanoTime)
transcoder (:transcoder conf identity)]
(.operate ^AerospikeClient client
Expand All @@ -386,7 +385,7 @@
^BatchPolicy policy
^List batch-list)
(-> op-future
(p/then' (comp transcoder #(mapv batch-record->map %)) completion-executor)
(p/then' (comp transcoder #(collections/->list batch-record->map %)) completion-executor)
(register-events client-events :batch-operate nil start-time conf))))


Expand Down Expand Up @@ -426,7 +425,7 @@
(register-events client-events :info nil start-time conf))))

(get-nodes [_this]
(into [] (.getNodes ^AerospikeClient client)))
(Arrays/asList (.getNodes ^AerospikeClient client)))

(get-cluster-stats [_this]
(-> (.getClusterStats ^AerospikeClient client)
Expand Down
19 changes: 19 additions & 0 deletions src/main/clojure/aerospike_clj/collections.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
(ns aerospike-clj.collections
(:import (java.util ArrayList Collection Collections List)
(java.util.function Consumer)))

(defn ->list
"Returns a new [[java.util.List]] containing the result of applying `mapper-fn` to each item in `col`.
Returns an unmodifiable list.
*Note*: This will usually be faster than `(mapv mapper-fn col)` because:
- This function allocates a new [[java.util.ArrayList]] in the exact `(.size col)` size, and then
fills it with the mapped values.
- If the underlying collection is not a Clojure sequence, then `mapv` will first convert it
to a Clojure sequence and then map over it. This function will not do that."
^List [mapper-fn ^Collection col]
(let [res (ArrayList. (.size col))]
(.forEach col
(reify Consumer
(accept [_ item]
(.add res (mapper-fn item)))))
asafch marked this conversation as resolved.
Show resolved Hide resolved
(Collections/unmodifiableList res)))
17 changes: 14 additions & 3 deletions src/main/clojure/aerospike_clj/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
(= bin-names [""]))

(defn string-keys?
"Predicate function to determine whether all keys provided for bins are strings."
{:docstring "Predicate function to determine whether all keys provided for bins are strings."
asafch marked this conversation as resolved.
Show resolved Hide resolved
:deprecated "3.1.0"}
[bin-names]
(every? string? bin-names))

Expand All @@ -39,8 +40,18 @@

(defn v->array
"An optimized way to convert vectors into Java arrays of type `clazz`."
asafch marked this conversation as resolved.
Show resolved Hide resolved
[clazz v]
(.toArray ^Collection v ^"[Ljava.lang.Object;" (make-array clazz (count v))))
([clazz ^Collection v]
(.toArray v ^"[Ljava.lang.Object;" (make-array clazz 0)))
([clazz mapper-fn ^Collection v]
(let [size (.size v)
res ^"[Ljava.lang.Object;" (make-array clazz size)
iterator (.iterator v)]
(loop [i 0]
(when (and (< i size)
(.hasNext iterator))
asafch marked this conversation as resolved.
Show resolved Hide resolved
(aset res i (mapper-fn (.next iterator)))
(recur (inc i))))
res)))

(defn vectorize
"convert a single value to a vector or any collection to the equivalent vector.
Expand Down
Loading