diff --git a/CHANGES b/CHANGES index bead73c8..8ed75669 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,12 @@ +* 0.3.0 + * simple benchmark now uses docopt for cli parsing + * New make target to run some benchmarks 'make benchmark' + * simple benchmark now support pipelines tests + * Renamed RedisCluster --> StrictRedisCluster + * Implement backwards compatible redis.Redis class in cluster mode. It was named RedisCluster and everyone updating from 0.2.0 to 0.3.0 should consult docs/Upgrading.md for instructions how to change your code. + * Added comprehensive documentation regarding pipelines + * Meta retrieval commands(slots, nodes, info) for Redis Cluster. (iandyh) + * 0.2.0 * Moved pipeline code into new file. * Code now uses a proper cluster connection pool class that handles diff --git a/MANIFEST.in b/MANIFEST.in index 93095bba..c09e29cb 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -3,5 +3,4 @@ include docs/License.txt include CHANGES include setup.py include README.md -include docs/ALPHA.md -include docs/Authorrs +include docs/Authors diff --git a/Makefile b/Makefile index 6a78c971..5f3534f6 100644 --- a/Makefile +++ b/Makefile @@ -120,6 +120,7 @@ help: @echo " sdist make a source distribution" @echo " bdist make an egg distribution" @echo " install install package" + @echo " benchmark runs all benchmarks. assumes nodes running on port 7001 and 7007" @echo " *** CI Commands ***" @echo " start starts a test redis cluster" @echo " stop stop all started redis nodes (Started via 'make start' only affected)" @@ -209,4 +210,18 @@ redis-install: gem install redis sleep 3 +benchmark: + @echo "" + @echo " -- Running Simple benchmark with StrictRedis lib and non cluster server --" + python benchmarks/simple.py --port 7007 --timeit --nocluster + @echo "" + @echo " -- Running Simple benchmark with StrictRedisCluster lib and cluster server --" + python benchmarks/simple.py --port 7001 --timeit + @echo "" + @echo " -- Running Simple benchmark with pipelines & StrictRedis lib and non cluster server --" + python benchmarks/simple.py --port 7007 --timeit --pipeline --nocluster + @echo "" + @echo " -- Running Simple benchmark with StrictRedisCluster lib and cluster server" + python benchmarks/simple.py --port 7001 --timeit --pipeline + .PHONY: test diff --git a/README.md b/README.md index 2b170f46..21a5ea12 100644 --- a/README.md +++ b/README.md @@ -4,19 +4,35 @@ Redis cluster client in python for the official cluster support targeted for red This project is a port of `redis-rb-cluster` by antirez, with alot of added functionality. The original source can be found at https://github.com/antirez/redis-rb-cluster -[![Build Status](https://travis-ci.org/Grokzen/redis-py-cluster.svg?branch=master)](https://travis-ci.org/Grokzen/redis-py-cluster) [![Coverage Status](https://coveralls.io/repos/Grokzen/redis-py-cluster/badge.png)](https://coveralls.io/r/Grokzen/redis-py-cluster) [![Latest Version](https://pypip.in/version/redis-py-cluster/badge.svg)](https://pypi.python.org/pypi/redis-py-cluster/) [![Downloads](https://pypip.in/download/redis-py-cluster/badge.svg)](https://pypi.python.org/pypi/redis-py-cluster/) [![Supported Python versions](https://pypip.in/py_versions/redis-py-cluster/badge.svg)](https://pypi.python.org/pypi/redis-py-cluster/) [![License](https://pypip.in/license/redis-py-cluster/badge.svg)](https://pypi.python.org/pypi/redis-py-cluster/) [![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/Grokzen/redis-py-cluster?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Code Health](https://landscape.io/github/Grokzen/redis-py-cluster/unstable/landscape.svg)](https://landscape.io/github/Grokzen/redis-py-cluster/unstable) +[![Build Status](https://travis-ci.org/Grokzen/redis-py-cluster.svg?branch=master)](https://travis-ci.org/Grokzen/redis-py-cluster) [![Coverage Status](https://coveralls.io/repos/Grokzen/redis-py-cluster/badge.png)](https://coveralls.io/r/Grokzen/redis-py-cluster) [![PyPI version](https://badge.fury.io/py/pykwalify.svg)](http://badge.fury.io/py/pykwalify) [![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/Grokzen/redis-py-cluster?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Code Health](https://landscape.io/github/Grokzen/redis-py-cluster/unstable/landscape.svg)](https://landscape.io/github/Grokzen/redis-py-cluster/unstable) + + + +# Project status + +The project is not dead but not much new development is done right now. I do awnser issue reports and pull requests as soon as possible and if you have a problem you can ping me inside the gitter channel that you can find here [![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/Grokzen/redis-py-cluster?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) and i will help you out with problems or usage of this lib. + +As of release `0.3.0` this project will be considered stable and usable in production. Just remember that if you are going to use redis cluster to please reda up on the documentation that you can find in the bottom of this Readme. It will contain usage examples and descriptions of what is implemented and what is not implemented and why things are the way they are. + +On the topic about porting/moving this code into `redis-py` there is currently work over here https://github.com/andymccurdy/redis-py/pull/604 that will bring cluster uspport based on this code. But my suggestion is that until that work is completed that you should use this lib. + + + +## Upgrading instructions + +Please read the [following](docs/Upgrading.md) documentation that will go through all changes that is required when upgrading `redis-py-cluster` between versions. ## Dependencies & supported python versions -- redis >= 2.10.2 -- Cluster enabled redis servers. Only Redis 3.0 beta.7 and above is supported because of CLUSTER SLOTS command was introduced in that release. -- Optional: hiredis >= 0.1.3 +- Python: redis >= `2.10.2` is required +- Redis server >= `3.0.0` is required +- Optional Python: hiredis >= `0.1.3` Hiredis is tested and supported on all supported python versions. -Supported python versions: +Supported python versions, all minor releases in each major version should be supported unless otherwise stated here: - 2.7.x - 3.2.x @@ -48,20 +64,23 @@ $ python setup.py install Small sample script that show how to get started with RedisCluster. `decode_responses=True` is required to have when running on python3. ```python ->>> from rediscluster import RedisCluster +>>> from rediscluster import StrictRedisCluster >>> startup_nodes = [{"host": "127.0.0.1", "port": "7000"}] ->>> rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True) +>>> rc = StrictRedisCluster(startup_nodes=startup_nodes, decode_responses=True) >>> rc.set("foo", "bar") True >>> rc.get("foo") 'bar' ``` -The following imports can be imported from `redis` package. +The following imports can be imported from `redis` package. -- `RedisCluster` -- `StrictClusterPipeline` -- `ClusterPubSub` +- `StrictRedisCluster` +- `RedisCluster` +- `StrictClusterPipeline` +- `ClusterPubSub` + +`StrictRedisCluster` is based on `redis.StrictRedis` and `RedisCluster` has the same functionality as `redis.Redis` even if it is not directly based on it. @@ -71,38 +90,36 @@ All tests are currently built around a 6 redis server cluster setup (3 masters + The easiest way to setup a cluster is to use either a Docker or Vagrant. They are both described in [Setup a redis cluster. Manually, Docker & Vagrant](docs/Cluster_Setup.md). +To run all tests in all supported environments with `tox` read this [Tox multienv testing](docs/Tox.md) -### Tox - Multi environment testing - -Tox is the easiest way to run all tests because it will manage all dependencies and run the correct test command for you. - -TravisCI will use tox to run tests on all supported python & hiredis versions. - -Install tox with `pip install tox` - -To run all environments you need all supported python versions installed on your machine. (See supported python versions list) and you also need the python-dev package for all python versions to build hiredis. - -To run a specific python version use either `tox -e py27` or `tox -e py34` - ## More documentation More detailed documentation can be found in `docs` folder. +- [Benchmarks](docs/Benchmarks.md) - [Pubsub](docs/Pubsub.md) - [Setup a redis cluster. Manually, Docker & Vagrant](docs/Cluster_Setup.md) - [Command differences](docs/Commands.md) - [Limitations and differences](docs/Limits_and_differences.md) - [Redisco support (Django ORM)](docs/Redisco.md) +- [Pipelines](docs/Pipelines.md) - [Threaded Pipeline support](docs/Threads.md) +- [Cluster Management class](docs/ClusterMgt.md) - [Authors](docs/Authors) ## Disclaimer -Both this client and Redis Cluster are a work in progress that is not suitable to be used in production environments. This is only my current personal opinion about both projects. +Both Redis cluster and redis-py-cluster is considered stable and production ready. + +But this depends on what you are going to use clustering for. In the simple use cases with SET/GET and other single key functions there is not issues. If you require multi key functinoality or pipelines then you must be very carefull when developing because they work slightly different from the normal redis server. + +If you require advance features like pubsub or scripting, this lib and redis do not handle that kind of use-cases very well. You either need to develop a custom solution yourself or use a non clustered redis server for that. + +Finally, this lib itself is very stable and i know of atleast 2 companies that use this in production with high loads and big cluster sizes. diff --git a/benchmarks/simple.py b/benchmarks/simple.py index 7f651d20..af30da85 100644 --- a/benchmarks/simple.py +++ b/benchmarks/simple.py @@ -2,10 +2,9 @@ # python std lib import time -import argparse -# rediscluster imports -from rediscluster.rediscluster import RedisCluster +# 3rd party imports +from docopt import docopt def loop(rc, reset_last_key=None): @@ -35,67 +34,79 @@ def loop(rc, reset_last_key=None): except Exception as e: print("error {0}".format(e)) - time.sleep(0.1) + time.sleep(0.05) def timeit(rc, itterations=50000): - """ Time how long it take to run a number of set/get:s + """ + Time how long it take to run a number of set/get:s """ t0 = time.time() for i in xrange(0, itterations): # noqa - try: - s = "foo{0}".format(i) - rc.set(s, i) - rc.get(s) - except Exception as e: - print("error {0}".format(e)) + s = "foo{0}".format(i) + rc.set(s, i) + rc.get(s) t1 = time.time() - t0 - print("{}k SET and then GET took: {} seconds... {} itterations per second".format((itterations / 1000), t1, (itterations / t1))) + print("{}k SET/GET operations took: {} seconds... {} operations per second".format((itterations / 1000) * 2, t1, (itterations / t1) * 2)) + + +def timeit_pipeline(rc, itterations=50000): + """ + Time how long it takes to run a number of set/get:s inside a cluster pipeline + """ + t0 = time.time() + for i in xrange(0, itterations): # noqa + s = "foo{0}".format(i) + + p = rc.pipeline() + p.set(s, i) + p.get(s) + p.execute() + + t1 = time.time() - t0 + print("{}k SET/GET operations inside pipelines took: {} seconds... {} operations per second".format((itterations / 1000) * 2, t1, (itterations / t1) * 2)) if __name__ == "__main__": - parser = argparse.ArgumentParser( - conflict_handler="resolve", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser.add_argument( - "-h", - "--host", - help="host of a cluster member", - default="127.0.0.1" - ) - parser.add_argument( - "-p", - "--port", - help="port of a cluster member", - type=int, - default=7000 - ) - parser.add_argument( - "--timeit", - help="run a mini benchmark to test performance", - action="store_true" - ) - parser.add_argument( - "--resetlastkey", - help="reset __last__ key", - action="store_true" - ) - args = parser.parse_args() - - startup_nodes = [ - {"host": args.host, "port": args.port} - ] - - rc = RedisCluster(startup_nodes=startup_nodes, max_connections=32, socket_timeout=0.1, decode_responses=True) - - if args.timeit: + __docopt__ = """ +Usage: + simple [--host IP] [--port PORT] [--nocluster] [--timeit] [--pipeline] [--resetlastkey] [-h] [--version] + +Options: + --nocluster If flag is set then StrictRedis will be used instead of cluster lib + --host IP Redis server to test against [default: 127.0.0.1] + --port PORT Port on redis server [default: 7000] + --timeit run a mini benchmark to test performance + --pipeline Only usable with --timeit flag. Runs SET/GET inside pipelines. + --resetlastkey reset __last__ key + -h --help show this help and exit + -v --version show version and exit + """ + + args = docopt(__docopt__, version="0.3.0") + + startup_nodes = [{"host": args["--host"], "port": args["--port"]}] + + if not args["--nocluster"]: + from rediscluster import StrictRedisCluster + rc = StrictRedisCluster(startup_nodes=startup_nodes, max_connections=32, socket_timeout=0.1, decode_responses=True) + else: + from redis import StrictRedis + rc = StrictRedis(host=args["--host"], port=args["--port"], socket_timeout=0.1, decode_responses=True) + + if args["--timeit"]: test_itterstions = [ + 5000, 10000, - 25000, + 20000, ] - for itterations in test_itterstions: - timeit(rc, itterations=itterations) + + if args["--pipeline"]: + for itterations in test_itterstions: + timeit_pipeline(rc, itterations=itterations) + else: + for itterations in test_itterstions: + timeit(rc, itterations=itterations) else: - loop(rc, reset_last_key=args.resetlastkey) + loop(rc, reset_last_key=args["--resetlastkey"]) diff --git a/dev-requirements.txt b/dev-requirements.txt index 6982409f..4946aada 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -5,3 +5,4 @@ hiredis >= 0.1.3 pytest >= 2.5.0 testfixtures >= 4.0.1 mock == 1.0.1 +docopt == 0.6.2 diff --git a/docs/ALPHA.md b/docs/ALPHA.md deleted file mode 100644 index aff3f5c1..00000000 --- a/docs/ALPHA.md +++ /dev/null @@ -1,14 +0,0 @@ -# Beta notes - -Release 0.2.0 of redis-cluster is a Beta release. - -Before using this lib please read the documentation and understand the major differences and problems that exists with redis clustering. The following documents is best to read. - -- [Command differences](docs/Commands.md) -- [Limitations and differences](docs/Limits_and_differences.md) - -It is recommended to test this lib in real world scenarios to find problems and areas to improve for next release. - -It is NOT recommended to run this lib in production. - -If you find any bugs or have ideas for improvment please open a ticket at https://github.com/Grokzen/redis-py-cluster diff --git a/docs/Authors b/docs/Authors index 5a319b37..30fa9dce 100644 --- a/docs/Authors +++ b/docs/Authors @@ -17,3 +17,4 @@ Authors who contributed code or testing: - Dobrite - https://github.com/dobrite - 72squared - https://github.com/72squared - Neuron Teckid - https://github.com/neuront + - iandyh - https://github.com/iandyh diff --git a/docs/Benchmarks.md b/docs/Benchmarks.md new file mode 100644 index 00000000..4e0def3e --- /dev/null +++ b/docs/Benchmarks.md @@ -0,0 +1,34 @@ +# Benchmarks + +There is a few benchmarks that is designed to test specific parts of the code that will show how big of a performance difference there is between using this lib and the normal Redis client. + + + +## Setup benchmarks + +Before running any benchmark you should install this lib in editable mode inside a virtualenv so it can import `StrictRedisCluster` lib. + +Install with + +``` +$ pip install -e . +``` + +You also need a few redis servers to test against. It is required to have 1 cluster with atleast one node on port `7001` and it also required to have a non-clustered server on port `7007`. + + +## Bencmarks + +`simple.py` - This benchmark can be used to messure a simple `set` and `get` operation chain. It also support running pipelines bu adding the flag `--pipeline` + + + +## Run predefined benchmarks + +There is a set of predefined benchmarks that can be runned to messure performance drop from using this library. + +To run the benchmarks + +``` +make benchmark +``` diff --git a/docs/ClusterMgt.md b/docs/ClusterMgt.md new file mode 100644 index 00000000..f7a4bbb7 --- /dev/null +++ b/docs/ClusterMgt.md @@ -0,0 +1,39 @@ +# Cluster Mgt class + +The redis cluster can be managed through a cluster management class. It can for example be used to query the cluster for the current slots or the nodes setup. + +The following methods is implemented: + +- info +- slots +- nodes + +The following methods is not yet implemented: + +- addslots +- count_failure_reports +- countkeysinslot +- delslots +- failover +- forget +- getkeysinslot +- keyslot +- meet +- replicate +- reset +- saveconfig +- set_config_epoch +- setslot +- slaves + + + +# Usage example + +```python +>>> from rediscluster.cluster_mgt import RedisClusterMgt +>>> startup_nodes = [{"host": "127.0.0.1", "port": "7000"}] +>>> r = RedisClusterMgt(startup_nodes) +>>> r.slots() +{'slave': defaultdict(, {'172.17.42.12:7003': [[0L, 5460L]], '172.17.42.12:7005': [[10923L, 16383L]], '172.17.42.12:7004': [[5461L, 10922L]]}), 'master': defaultdict(, {'172.17.42.12:7002': [[10923L, 16383L]], '172.17.42.12:7001': [[5461L, 10922L]], '172.17.42.12:7000': [[0L, 5460L]]})} +``` diff --git a/docs/Cluster_Setup.md b/docs/Cluster_Setup.md index 6a0bb82d..4f95b2cf 100644 --- a/docs/Cluster_Setup.md +++ b/docs/Cluster_Setup.md @@ -1,7 +1,7 @@ # How to setup a cluster manually - Redis cluster tutorial: http://redis.io/topics/cluster-tutorial - - Redis cluster specs: http://redis.io/topics/cluster-spect + - Redis cluster specs: http://redis.io/topics/cluster-spec - This video will describe how to setup and use a redis cluster: http://vimeo.com/63672368 (This video old and maybe outdated) diff --git a/docs/Commands.md b/docs/Commands.md index 9f63ef9c..4f848fb3 100644 --- a/docs/Commands.md +++ b/docs/Commands.md @@ -1,6 +1,6 @@ # Implemented commands -This document will describe all changes that RedisCluster have done to make a command to work. +This document will describe all changes that StrictRedisCluster have done to make a command to work. If a command is not listed here then the default implementation in 'StrictRedis' is used. diff --git a/docs/Limits_and_differences.md b/docs/Limits_and_differences.md index eb593b5a..9523d077 100644 --- a/docs/Limits_and_differences.md +++ b/docs/Limits_and_differences.md @@ -2,7 +2,7 @@ There is alot of differences that have to be taken into consideration when using redis cluster. -Any method that can operate on multiple keys have to be reimplemented in the client and in some cases that is not possible to do. In general any method that is overriden in RedisCluster have lost the ability of being atomic. +Any method that can operate on multiple keys have to be reimplemented in the client and in some cases that is not possible to do. In general any method that is overriden in StrictRedisCluster have lost the ability of being atomic. Pipelines do not work the same way in a cluster. In `StrictRedis` it batch all commands so that they can be executed at the same time when requested. But with RedisCluster pipelines will send the command directly to the server when it is called, but it will still store the result internally and return the same data from .execute(). This is done so that the code still behaves like a pipeline and no code will break. A better solution will be implemented in the future. diff --git a/docs/Pipelines.md b/docs/Pipelines.md new file mode 100644 index 00000000..386b26f0 --- /dev/null +++ b/docs/Pipelines.md @@ -0,0 +1,122 @@ +# The philosophy on pipelines + +After playing around with pipelines and thinking about possible solutions that could be used in a cluster setting this document will describe how pipelines work, strengths and weaknesses with the implementation that was chosen. + +Why can't we reuse the pipeline code in `redis-py`? In short it is almost the same reason why code from the normal redis client can't be reused in a cluster environment and that is because of the slots system. Redis cluster consist of a number of slots that is distributed across a number of servers and each key belongs in one of these slots. + +In the normal pipeline implementation in `redis-py` we can batch send all the commands and send them to the server at once, thus speeding up the code by not issuing many requests one after another. We can say that we have defined and guaranteed execution order becuase of this. + +One problem that appears when you want to do pipelines in a cluster environment is that you can't have guaranteed execution order in the same way as a single server pipeline. The problem is that because you can queue an command to any key, we will end up in most of the cases having to talk to 2 or more nodes in the cluster to execute the pipeline. The problem with that is that there is no single place/node/way to send the pipeline and redis will sort everything out by itself via some internal mechanisms. Because of that when we build a pipeline for a cluster we have to build several smaller pipelines that we each send to the desegnated node in the cluster. + +When the pipeline is executed in the client each key is checked to what slot it shold be sent to and the pipelines is built up based on that information. One thing to note here is that there will be partial correct execution order if you look over the entire cluster because for each pipeline the ordering will be correct. It can also be argued that the correct execution order is applied/valid for each slot in the cluster. + +The next thing to take into consideration is what commands should be available and which should be blocked/locked. + +In most cases and in almost all solutions multi key commands have to be blocked hard from beeing execute inside a pipeline. This would only be possible in the case you have a pipeline implementation that allways executes immeditally each command is queued up. That solution would only give the interface of working like a pipeline to ensure old code will still work, but it would not give any benefits or advantages other than all commands would work and old code would work. + +In the solution for this lib multikey commands is blocked hard and will probably not be enabled in pipelines. If you really need to use them you need to execute them through the normal cluster client if they are implemented and works in there. Why can't multi key commands work? In short again it is because they keys can live in different slots on different nodes in the cluster. It is possible in theory to have any command work in a cluster, but only if the keys operated on belongs to the same cluster slot. This lib have decided that currently no serious support for that will be attempted. + +Examples on commands that do not work is MGET, MSET, MOVE. + +One good thing that comes out of blocking multi key commands is that correct execution order is less of a problem and as long as it applies to each slot in the cluster we shold be fine. + +Consider the following example. Create a pipeline and issue 6 commands A, B, C, D, E, F and then execute it. The pipeline is calculated and 2 sub pipelines is created with A, C, D, F in the first and B, E in the second. Both pipelines is then sent to each node in the cluster and a response is sent back. For the first node [True, MovedException(12345), MovedException(12345), True] and from the second node [True, True]. After this response is parsed we see that 2 commands in the first pipeline did not work and must be sent to another node. This case happens if the client slots cache is wrong because a slot was migrated to another node in the cluster. After parsing the response we then build a third pipeline object with commands [C, D] to the second node. The third object is executed and passes and from the client perspective the entire pipeline was executed. + +If we look back at the order we executed the commands we get [A, F] for the first node and [B, E, C, D] for the second node. At first glance this looks like it is out of order because command E is executed before C & D. Why do this not matter? Because no multi key operations can be done in a pipeline we only have to care the execution order is correct for each slot and in this case it was because B & E belongs to the same slot and C & D belongs to the same slot. There should be no possible way to corrupt any data between slots if multi key commands is blocked by the code. + +What is good with this pipeline solution? First we can acctually have a pipeline solution that will work in most cases with few commands blocked (only multi key commands). Secondly we can run it in parralell with threads or gevent to increase the performance of the pipeline even further, making the benefits even greater. + + + +## Transactions and WATCH + +Support for transactions and WATCH:es in pipelines. If we look on the entire pipeline across all nodes in the cluster there is no possible way to have a complete transaction across all nodes because if we need to issue commands to 3 servers, each server is handled by its own and there is no way to tell other nodes to abort a transaction if only one of the nodes fail but not the others. A possible solution for that could be to implement a 2 step commit process. The 2 steps would consist of building 2 batches of commands for each node where the first batch would consist of validating the state of each slot that the pipeline wants to operate on. If any of the slots is migrating or moved then the client can correct its slots cache and issue a more correct pipeline batch. The second step would be to issue the acctuall commands and the data would be commited to redis. The big problem with this is that 99% of the time this would work really well if you have a very stable cluster with no migrations/resharding/servers down. But there can be times where a slot has begun migration in between the 2 steps of the pipeline and that would cause a race condition where the client thinks it has corrected the pipeline and wants to commit the data but when it does it will still fail. + +Why `MULTI/EXEC` support won't work in a cluster environment. There is some test code in the second `MULTI/EXEC cluster test code` of this document that tests is `MULTI/EXEC` is possible to use in a cluster pipeline. The tests shows a huge problem when errors occus. If we wrap `MULTI/EXEC` in a packed set of commands then if a slot is migrating we will not get a good error we can parse and use. Currently it will only report `True` or `False` so we can narrow down what command failed but not why it failed. This might work really well if used on a non clustered node becuase it do not have to take care of ASK/MOVED errors. But for a cluster we need to know what cluster error occured so the correct action to fix the problem can be taken. Sinc there is more then 1 error to take care of it is not possible to take action based on just `True` or `False`. + +Because of this problem with error handling `MULTI/EXEC` is blocked hard in the code from beeing used in a pipeline because the current implementation can't handle the errors. + +In theory it could be possible to design a pipeline implementation that can handle this case by trying to determined by itself what it should do with the error by either asking the cluster after a `False` value was found in the response about the current state of the slot or just default to `MOVED` error handling and hope for the best. The problem is that this is not 100% guaranteed to work and can easily cause problems when wrong action was taken on the response. + +Currently `WATCH` requires more studying if it possible to use or not, but sinc it is tied into `MULTI/EXEC` pattern it probably will not be supported for now. + + + +## MULTI/EXEC cluster test code + +This code do NOT wrap MULTI/EXEC around the commands when packed + +>>> from rediscluster import StrictRedisCluster as s +>>> r = s(startup_nodes=[{"host": "127.0.0.1", "port": "7002"}]) +>>> # Simulate that a slot is migrating to another node +>>> r.connection_pool.nodes.slots[14226] = {'host': '127.0.0.1', 'server_type': 'master', 'port': 7001, 'name': '127.0.0.1:7001'} +>>> p = r.pipeline() +>>> p.command_stack = [] +>>> p.command_stack.append((["SET", "ert", "tre"], {})) +>>> p.command_stack.append((["SET", "wer", "rew"], {})) +>>> p.execute() +ClusterConnection +[True, ResponseError('MOVED 14226 127.0.0.1:7002',)] +ClusterConnection +[True] +>>> + + + +This code DO wrap MULTI/EXEC around the commands when packed + +>>> from rediscluster import StrictRedisCluster as s +>>> r = s(startup_nodes=[{"host": "127.0.0.1", "port": "7002"}]) +>>> # Simulate that a slot is migrating to another node +>>> r.connection_pool.nodes.slots[14226] = {'host': '127.0.0.1', 'server_type': 'master', 'port': 7001, 'name': '127.0.0.1:7001'} +>>> p = r.pipeline() +>>> p.command_stack = [] +>>> p.command_stack.append((["SET", "ert", "tre"], {})) +>>> p.command_stack.append((["SET", "wer", "rew"], {})) +>>> p.execute() +ClusterConnection +[True, False] +>>> + + + +## Different pipeline solutions + +This section will describe different types of pipeline solutions. It will list their main benefits and weaknesses. + +Note: This section is mostly random notes and thoughts and not that well written and cleaned up right now. It will be done at some point in the future. + + +Simple but yet sequential pipeline. This solution acts more like an interface for the already existing pipeline implementation and only provides a simple backwards compatible interface to ensure that code that sexists still will work withouth any major modifications. The good this with this implementation is that because all commands is runned in sequence it will handle MOVED/ASK redirections very good and withouth any problems. The major downside to this solution is that no commands is ever batched and runned in parralell and thus you do not get any major performance boost from this approach. Other plus is that execution order is preserved across the entire cluster but a major downside is that thte commands is no longer atomic on the cluster scale because they are sent in multiple commands to different nodes. + ++ Sequential execution of the entire pipeline +- No batching of commands aka. no execution speedup ++ Easy ASK/MOVED handling + + + + +Current pipeline implementation. This implementation is rather good and works well because it combines the existing pipeline interface and functionality and it also provides a basic handling of ASK/MOVED errors inside the client. One major downside to this is that execution order is not preserved across the cluster. Altho the execution order is somewhat broken if you look at the entire cluster level becuase commands can be splitted so that cmd1, cmd3, cmd5 get sent to one server and cmd2, cmd4 gets sent to another server. The order is then broken globally but locally for each server it is preserved and maintained correctly. On the other hand i guess that there can't be any commands that can affect different hashslots within the same command so it maybe do not really matter if the execution order is not correct because for each slot/key the order is valid. +There might be some issues with rebuilding the correct response ordering from the scattered data because each command might be in different sub pipelines. But i think that our current code still handles this correctly. I think i have to figure out some wierd case where the execution order acctually matters. There might be some issues with the nonsupported mget/mset commands that acctually performs different sub commands then it currently supports. + ++ Sequential execution per node +- Not sequential execution on the entire pipeline +- Medium difficult ASK/MOVED handling + + + + +There is a even simpler form of pipelines that can be made where all commands is supported as long as they conform to the same hashslot because redis supports that mode of operation. The good thing with this is that sinc all keys must belong to the same slot there can't be very few ASK/MOVED errors that happens and if they happen they will be very easy to handle because the entire pipeline is kinda atomic because you talk to the same server and only 1 server. There can't be any multiple server communication happening. + ++ Super simple ASK/MOVED handling ++ Sequential execution per slot and through the entire pipeline +- Single slot per pipeline + + + + +One other solution is the 2 step commit solution where you send for each server 2 batches of commands. The first command should somehow establish that each keyslot is in the correct state and able to handle the data. After the client have recieved OK from all nodes that all data slots is good to use then it will acctually send the real pipeline with all data and commands. The big problem with this approach is that ther eis a gap between the checking of the slots and the acctual sending of the data where things can happen to the already established slots setup. But at the same time there is no possibility of merging these 2 steps because if step 2 is automatically runned if step 1 is Ok then the pipeline for the first node that will fail will fail but for the other nodes it will suceed but when it should not because if one command gets ASK/MOVED redirection then all pipeline objects must be rebuilt to match the new specs/setup and then reissued by the client. The major advantage of this solution is that if you have total controll of the redis server and do controlled upgrades when no clients is talking to the server then it can acctually work really well because there is no possibility that ASK/MOVED will triggered by migrations in between the 2 batches. + +- Big possibility of race conditions that can cause problems ++ Still rather safe because of the 2 step commit solution ++ Handles ASK/MOVED before commiting the data diff --git a/docs/Pubsub.md b/docs/Pubsub.md index ff0bbe51..1b43195e 100644 --- a/docs/Pubsub.md +++ b/docs/Pubsub.md @@ -18,11 +18,11 @@ Discussion on this topic can be found here: https://groups.google.com/forum/?hl= -# How pubsub works in RedisCluster +# How pubsub works in StrictRedisCluster -In 0.2.0 a first solution to pubsub problem was implemented, but it contains some limitations. +In `0.2.0` a first solution to pubsub problem was implemented, but it contains some limitations. -When a new RedisCluster instance is created it will now just after all slots is initialized determine what one node will be the pubsub node. Currently it will use the node with the highest port number. +When a new `StrictRedisCluster` instance is created it will now just after all slots is initialized determine what one node will be the pubsub node. Currently it will use the node with the highest port number. With this solution, pubsub will work in a cluster without any other major workarounds. diff --git a/docs/Threads.md b/docs/Threads.md index 13c083f6..0a7f77cf 100644 --- a/docs/Threads.md +++ b/docs/Threads.md @@ -20,7 +20,7 @@ We improve the situation by using python threads, making each request in paralle You can disable threaded execution either in the class constructor: ``` -r = rediscluster.RedisCluster( ... pipeline_use_threads=False) #true by default +r = rediscluster.StrictRedisCluster( ... pipeline_use_threads=False) #true by default pipe = r.pipeline() ``` @@ -30,7 +30,7 @@ Or you can disable it on a case by case basis as you instantiate the pipeline ob pipe = r.pipeline(use_threads=False) ``` -The later example always overrides if explicitly set. Otherwise, it falls back on the value passed to the RedisCluster constructor. +The later example always overrides if explicitly set. Otherwise, it falls back on the value passed to the StrictRedisCluster constructor. #### Footnote: Gevent Python offers something even more lightweight and efficient than threads to perform tasks in parallel: GEVENT. diff --git a/docs/Tox.md b/docs/Tox.md new file mode 100644 index 00000000..629e744b --- /dev/null +++ b/docs/Tox.md @@ -0,0 +1,11 @@ +# Tox - Multi environment testing + +Tox is the easiest way to run all tests because it will manage all dependencies and run the correct test command for you. + +TravisCI will use tox to run tests on all supported python & hiredis versions. + +Install tox with `pip install tox` + +To run all environments you need all supported python versions installed on your machine. (See supported python versions list) and you also need the python-dev package for all python versions to build hiredis. + +To run a specific python version use either `tox -e py27` or `tox -e py34` diff --git a/docs/Upgrading.md b/docs/Upgrading.md new file mode 100644 index 00000000..f06f7931 --- /dev/null +++ b/docs/Upgrading.md @@ -0,0 +1,16 @@ +# Upgrading redis-py-cluster + +This document will describe what must be done when upgrading between different versions to ensure that code still works. + + +## 0.2.0 --> 0.3.0 + +In `0.3.0` release the name of the client class was changed from `RedisCluster` to `StrictRedisCluster` and a new implementation of `RedisCluster` was added that is based on `redis.Redis` class. This was done to enable implementation a cluster enabled version of `redis.Redis` class. + +Because of this all imports and usage of `RedisCluster` must be changed to `StrictRedisCluster` so that existing code will remain working. If this is not done some issues could arise in existing code. + + + +## 0.1.0 --> 0.2.0 + +No major changes was done. diff --git a/rediscluster/__init__.py b/rediscluster/__init__.py index fe7d299f..70c87096 100644 --- a/rediscluster/__init__.py +++ b/rediscluster/__init__.py @@ -4,18 +4,20 @@ import sys # Import shortcut -from .client import RedisCluster +from .client import StrictRedisCluster, RedisCluster +from .cluster_mgt import RedisClusterMgt # NOQA from .pipeline import StrictClusterPipeline from .pubsub import ClusterPubSub # Monkey patch RedisCluster class into redis for easy access import redis +setattr(redis, "StrictRedisCluster", StrictRedisCluster) setattr(redis, "RedisCluster", RedisCluster) setattr(redis, "ClusterPubSub", ClusterPubSub) setattr(redis, "StrictClusterPipeline", StrictClusterPipeline) # Major, Minor, Fix version -__version__ = (0, 2, 0) +__version__ = (0, 3, 0) if sys.version_info[0:3] == (3, 4, 0): raise RuntimeError("CRITICAL: rediscluster do not work with python 3.4.0. Please use 3.4.1 or higher.") diff --git a/rediscluster/client.py b/rediscluster/client.py index d263492a..f5a80eb2 100644 --- a/rediscluster/client.py +++ b/rediscluster/client.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # python std lib +import datetime import random import string import time @@ -25,7 +26,7 @@ from redis.exceptions import RedisError, ResponseError, TimeoutError, DataError, ConnectionError, BusyLoadingError -class RedisCluster(StrictRedis): +class StrictRedisCluster(StrictRedis): """ If a command is implemented over the one in StrictRedis then it requires some changes compared to the regular implementation of the method. @@ -94,7 +95,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, Some kwargs is not supported and will raise RedisClusterException - db (Redis do not support database SELECT in cluster mode) """ - super(RedisCluster, self).__init__(**kwargs) + super(StrictRedisCluster, self).__init__(**kwargs) # Tweaks to StrictRedis client arguments when running in cluster mode if "db" in kwargs: @@ -136,7 +137,7 @@ def handle_cluster_command_exception(self, e): - MOVED: Updates the slots cache with the new ip:port - ASK: Returns a dict with ip:port where to connect to try again """ - errv = RedisCluster._exception_message(e) + errv = StrictRedisCluster._exception_message(e) if errv is None: raise e @@ -189,7 +190,7 @@ def parse_redirection_exception_msg(self, errv): a = errv[2].split(":") return {"action": errv[0], "slot": int(errv[1]), "host": a[0], "port": int(a[1])} - def pubsub(self, *args, **kwargs): + def pubsub(self, **kwargs): return ClusterPubSub(self.connection_pool, **kwargs) def pipeline(self, transaction=None, shard_hint=None, use_threads=None): @@ -219,9 +220,9 @@ def transaction(self, func, *watches, **kwargs): """ Transaction is not implemented in cluster mode yet. """ - raise RedisClusterException("method RedisCluster.transaction() is not implemented") + raise RedisClusterException("method StrictRedisCluster.transaction() is not implemented") - def _determine_slot(self, *args, **kwargs): + def _determine_slot(self, *args): """ figure out what slot based on command and args """ @@ -272,7 +273,7 @@ def execute_command(self, *args, **kwargs): action = {} command = args[0] try_random_node = False - slot = self._determine_slot(*args, **kwargs) + slot = self._determine_slot(*args) ttl = int(self.RedisClusterRequestTTL) while ttl > 0: ttl -= 1 @@ -335,7 +336,7 @@ def scan_iter(self, match=None, count=None): """ cursor = '0' while cursor != 0: - for node, node_data in self.scan(cursor=cursor, match=match, count=count).items(): + for _, node_data in self.scan(cursor=cursor, match=match, count=count).items(): cursor, data = node_data for item in data: yield item @@ -386,7 +387,7 @@ def msetnx(self, *args, **kwargs): kwargs.update(args[0]) # Itterate over all items and fail fast if one value is True. - for k, v in kwargs.items(): + for k, _ in kwargs.items(): if self.get(k): return False @@ -804,4 +805,92 @@ def register_script(self, script): raise RedisClusterException("Method register_script is not possible to use in a redis cluster") +class RedisCluster(StrictRedisCluster): + """ + Provides backwards compatibility with older versions of redis-py that + changed arguments to some commands to be more Pythonic, sane, or by + accident. + """ + # Overridden callbacks + RESPONSE_CALLBACKS = dict_merge( + StrictRedis.RESPONSE_CALLBACKS, + { + 'TTL': lambda r: r >= 0 and r or None, + 'PTTL': lambda r: r >= 0 and r or None, + } + ) + + def pipeline(self, transaction=True, shard_hint=None, use_threads=None): + """ + Return a new pipeline object that can queue multiple commands for + later execution. ``transaction`` indicates whether all commands + should be executed atomically. Apart from making a group of operations + atomic, pipelines are useful for reducing the back-and-forth overhead + between the client and server. + """ + if shard_hint: + raise RedisClusterException("shard_hint is deprecated in cluster mode") + + if transaction: + raise RedisClusterException("transaction is deprecated in cluster mode") + + return StrictClusterPipeline( + connection_pool=self.connection_pool, + startup_nodes=self.connection_pool.nodes.startup_nodes, + refresh_table_asap=self.refresh_table_asap, + nodes_callbacks=self.nodes_callbacks, + result_callbacks=self.result_callbacks, + response_callbacks=self.response_callbacks, + use_threads=self.pipeline_use_threads if use_threads is None else use_threads + ) + + def setex(self, name, value, time): + """ + Set the value of key ``name`` to ``value`` that expires in ``time`` + seconds. ``time`` can be represented by an integer or a Python + timedelta object. + """ + if isinstance(time, datetime.timedelta): + time = time.seconds + time.days * 24 * 3600 + return self.execute_command('SETEX', name, time, value) + + def lrem(self, name, value, num=0): + """ + Remove the first ``num`` occurrences of elements equal to ``value`` + from the list stored at ``name``. + The ``num`` argument influences the operation in the following ways: + num > 0: Remove elements equal to value moving from head to tail. + num < 0: Remove elements equal to value moving from tail to head. + num = 0: Remove all elements equal to value. + """ + return self.execute_command('LREM', name, num, value) + + def zadd(self, name, *args, **kwargs): + """ + NOTE: The order of arguments differs from that of the official ZADD + command. For backwards compatability, this method accepts arguments + in the form of name1, score1, name2, score2, while the official Redis + documents expects score1, name1, score2, name2. + If you're looking to use the standard syntax, consider using the + StrictRedis class. See the API Reference section of the docs for more + information. + Set any number of element-name, score pairs to the key ``name``. Pairs + can be specified in two ways: + As *args, in the form of: name1, score1, name2, score2, ... + or as **kwargs, in the form of: name1=score1, name2=score2, ... + The following example would add four values to the 'my-key' key: + redis.zadd('my-key', 'name1', 1.1, 'name2', 2.2, name3=3.3, name4=4.4) + """ + pieces = [] + if args: + if len(args) % 2 != 0: + raise RedisError("ZADD requires an equal number of " + "values and scores") + pieces.extend(reversed(args)) + for pair in iteritems(kwargs): + pieces.append(pair[1]) + pieces.append(pair[0]) + return self.execute_command('ZADD', name, *pieces) + + from rediscluster.pipeline import StrictClusterPipeline diff --git a/rediscluster/cluster_mgt.py b/rediscluster/cluster_mgt.py new file mode 100644 index 00000000..4b3f47f2 --- /dev/null +++ b/rediscluster/cluster_mgt.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +from collections import defaultdict + +from .connection import ClusterConnectionPool +from .exceptions import RedisClusterException +from .utils import clusterdown_wrapper, first_key, nslookup + + +class RedisClusterMgt(object): + + blocked_args = ('addslots', 'count_failure_reports', + 'countkeysinslot', 'delslots', 'failover', 'forget', + 'getkeysinslot', 'keyslot', 'meet', 'replicate', 'reset', + 'saveconfig', 'set_config_epoch', 'setslot', 'slaves') + + def __init__(self, startup_nodes=None, **kwargs): + self.connection_pool = ClusterConnectionPool( + startup_nodes=startup_nodes, + init_slot_cache=True, **kwargs + ) + + def __getattr__(self, attr): + if attr in self.blocked_args: + raise RedisClusterException('%s is currently not supported' % attr) + raise RedisClusterException('%s is not a valid Redis cluster argument' % attr) + + @clusterdown_wrapper + def _execute_command_on_nodes(self, nodes, *args, **kwargs): + command = args[0] + res = {} + for node in nodes: + c = self.connection_pool.get_connection_by_node(node) + try: + c.send_command(*args) + res[node["name"]] = c.read_response() + finally: + self.connection_pool.release(c) + return first_key(command, res) + + def _execute_cluster_commands(self, *args, **kwargs): + args = ('cluster',) + args + node = self.connection_pool.nodes.random_node() + return self._execute_command_on_nodes([node], *args, **kwargs) + + def info(self): + raw = self._execute_cluster_commands('info') + + def _split(line): + k, v = line.split(':') + yield k + yield v + return {k: v for k, v in + [_split(line) for line in raw.split('\r\n') if line]} + + def _make_host(self, host, port): + return '%s:%s' % (host, port) + + def slots(self, host_required=False): + slots_info = self._execute_cluster_commands('slots') + master_slots = defaultdict(list) + slave_slots = defaultdict(list) + for item in slots_info: + master_ip, master_port = item[2] + slots = [item[0], item[1]] + master_host = nslookup(master_ip) if host_required else master_ip + master_slots[self._make_host(master_host, master_port)].append(slots) + slaves = item[3:] + for slave_ip, slave_port in slaves: + slave_host = nslookup(slave_ip) if host_required else slave_ip + slave_slots[self._make_host(slave_host, slave_port)].append(slots) + + return { + 'master': master_slots, + 'slave': slave_slots + } + + def _parse_node_line(self, line): + line_items = line.split(' ') + ret = line_items[:8] + slots = [sl.split('-') for sl in line_items[8:]] + ret.append(slots) + return ret + + def nodes(self, host_required=False): + raw = self._execute_cluster_commands('nodes') + ret = {} + for line in raw.split('\n'): + if not line: + continue + node_id, ip_port, flags, master_id, ping, pong, epoch, \ + status, slots = self._parse_node_line(line) + role = flags + if ',' in flags: + _, role = flags.split(',') + + host = nslookup(ip_port) if host_required else ip_port + ret[host] = { + 'node_id': node_id, + 'role': role, + 'master_id': master_id, + 'last_ping_sent': ping, + 'last_pong_rcvd': pong, + 'epoch': epoch, + 'status': status, + 'slots': slots + } + return ret diff --git a/rediscluster/nodemanager.py b/rediscluster/nodemanager.py index 2df64467..8a48aec7 100644 --- a/rediscluster/nodemanager.py +++ b/rediscluster/nodemanager.py @@ -80,10 +80,12 @@ def initialize(self): self.flush_slots_cache() all_slots_covered = False disagreements = [] + startup_nodes_reachable = False for node in self.startup_nodes: try: r = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True) cluster_slots = r.execute_command("cluster", "slots") + startup_nodes_reachable = True except ConnectionError: continue except Exception: @@ -93,8 +95,7 @@ def initialize(self): # If there's only one server in the cluster, its ``host`` is '' # Fix it to the host in startup_nodes - if (len(cluster_slots) == 1 and len(cluster_slots[0][2][0]) == 0 - and len(self.startup_nodes) == 1): + if (len(cluster_slots) == 1 and len(cluster_slots[0][2][0]) == 0 and len(self.startup_nodes) == 1): cluster_slots[0][2][0] = self.startup_nodes[0]['host'] # No need to decode response because StrictRedis should handle that for us... @@ -137,8 +138,12 @@ def initialize(self): self.determine_pubsub_node() return + if not startup_nodes_reachable: + raise RedisClusterException("Redis Cluster cannot be connected. Please provide at least one reachable node.") + if not all_slots_covered: - raise RedisClusterException("All slots are not covered after query all startup_nodes. {} of {} covered...".format(len(self.slots), self.RedisClusterHashSlots)) + raise RedisClusterException("All slots are not covered after query all startup_nodes. {} of {} covered...".format( + len(self.slots), self.RedisClusterHashSlots)) def determine_pubsub_node(self): """ diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 65ea4bb8..08ccedd3 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -7,7 +7,7 @@ import threading # rediscluster imports -from .client import RedisCluster +from .client import StrictRedisCluster from .connection import by_node_context from .exceptions import RedisClusterException, ClusterDownException from .utils import clusterdown_wrapper @@ -18,13 +18,15 @@ from redis._compat import imap, unicode, xrange -class StrictClusterPipeline(RedisCluster): +class StrictClusterPipeline(StrictRedisCluster): """ """ - def __init__(self, connection_pool, nodes_callbacks=None, result_callbacks=None, response_callbacks=None, startup_nodes=[], connections=[], opt={}, refresh_table_asap=False, slots={}, nodes=[], use_threads=True): + def __init__(self, connection_pool, nodes_callbacks=None, result_callbacks=None, + response_callbacks=None, startup_nodes=None, refresh_table_asap=False, + use_threads=True): self.connection_pool = connection_pool - self.startup_nodes = startup_nodes + self.startup_nodes = startup_nodes if startup_nodes else [] self.refresh_table_asap = refresh_table_asap self.command_stack = [] @@ -134,7 +136,7 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T # Keep this section so that we can determine what nodes to contact for i in attempt: c = stack[i] - slot = self._determine_slot(*c[0], **c[1]) + slot = self._determine_slot(*c[0]) if slot in ask_slots: node = ask_slots[slot] else: @@ -191,7 +193,7 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T time.sleep(0.1) continue - errv = RedisCluster._exception_message(v) + errv = StrictRedisCluster._exception_message(v) if errv is None: continue diff --git a/rediscluster/utils.py b/rediscluster/utils.py index 0d07f60d..616d8be3 100644 --- a/rediscluster/utils.py +++ b/rediscluster/utils.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from socket import gethostbyaddr # rediscluster imports from .exceptions import RedisClusterException, ClusterDownException @@ -25,7 +26,9 @@ def dict_merge(*dicts): Merge all provided dicts into 1 dict. """ merged = {} - [merged.update(d) for d in dicts if is_dict(d)] + for d in dicts: + if is_dict(d): + merged.update(d) return merged @@ -46,7 +49,7 @@ def merge_result(command, res): is_dict(res) result = set([]) - for k, v in res.items(): + for _, v in res.items(): for value in v: result.add(value) return list(result) @@ -77,7 +80,7 @@ def clusterdown_wrapper(func): It will try 3 times to rerun the command and raises ClusterDownException if it continues to fail. """ def inner(*args, **kwargs): - for i in range(0, 3): + for _ in range(0, 3): try: return func(*args, **kwargs) except ClusterDownException: @@ -88,3 +91,10 @@ def inner(*args, **kwargs): # If it fails 3 times then raise exception back to caller raise ClusterDownException("CLUSTERDOWN error. Unable to rebuild the cluster") return inner + + +def nslookup(node_ip): + if ':' not in node_ip: + return gethostbyaddr(node_ip)[0] + ip, port = node_ip.split(':') + return '%s:%s' % (gethostbyaddr(ip)[0], port) diff --git a/setup.py b/setup.py index abec2d96..133249b5 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,6 @@ if os.getenv('USER', '').lower() == 'vagrant': del os.link -with open("docs/ALPHA.md") as f: - alpha = f.read() with open('README.md') as f: readme = f.read() with open('CHANGES') as f: @@ -22,9 +20,9 @@ setup( name="redis-py-cluster", - version="0.2.0", + version="0.3.0", description="Cluster library for redis 3.0.0 built on top of redis-py lib", - long_description=alpha + '\n\n' + readme + '\n\n' + history, + long_description=readme + '\n\n' + history, author="Johan Andersson", author_email="Grokzen@gmail.com", maintainer='Johan Andersson', @@ -44,8 +42,8 @@ # 'Development Status :: 1 - Planning', # 'Development Status :: 2 - Pre-Alpha', # 'Development Status :: 3 - Alpha', - 'Development Status :: 4 - Beta', - # 'Development Status :: 5 - Production/Stable', + # 'Development Status :: 4 - Beta', + 'Development Status :: 5 - Production/Stable', # 'Development Status :: 6 - Mature', # 'Development Status :: 7 - Inactive', 'Programming Language :: Python', diff --git a/tests/conftest.py b/tests/conftest.py index 7bf0ce74..6920fbbf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ import json # rediscluster imports -from rediscluster import RedisCluster +from rediscluster import StrictRedisCluster, RedisCluster, RedisClusterMgt # 3rd party imports import pytest @@ -28,14 +28,17 @@ def get_versions(**kwargs): return _REDIS_VERSIONS[key] -def _get_client(**kwargs): +def _get_client(cls=None, **kwargs): + if not cls: + cls = RedisCluster + params = {'startup_nodes': [{'host': '127.0.0.1', 'port': 7000}], 'socket_timeout': 10, 'decode_responses': False} params.update(kwargs) - return RedisCluster(**params) + return cls(**params) -def _init_client(request, **kwargs): - client = _get_client(**kwargs) +def _init_client(request, cls=None, **kwargs): + client = _get_client(cls=cls, **kwargs) client.flushdb() if request: def teardown(): @@ -45,6 +48,15 @@ def teardown(): return client +def _init_mgt_client(request, cls=None, **kwargs): + client = _get_client(cls=cls, **kwargs) + if request: + def teardown(): + client.connection_pool.disconnect() + request.addfinalizer(teardown) + return client + + def skip_if_server_version_lt(min_version): versions = get_versions() for version in versions.values(): @@ -64,23 +76,24 @@ def skip_if_redis_py_version_lt(min_version): @pytest.fixture() def o(request, **kwargs): """ - Create a Rediscluster instance with decode_responses set to True. + Create a StrictRedisCluster instance with decode_responses set to True. """ - return _init_client(request, decode_responses=True, **kwargs) + return _init_client(request, cls=StrictRedisCluster, decode_responses=True, + **kwargs) @pytest.fixture() def r(request, **kwargs): """ - Create a Rediscluster instance with default settings. + Create a StrictRedisCluster instance with default settings. """ - return _init_client(request, **kwargs) + return _init_client(request, cls=StrictRedisCluster, **kwargs) @pytest.fixture() def s(request, **kwargs): """ - Create a RedisCluster instance with 'init_slot_cache' set to false + Create a StrictRedisCluster instance with 'init_slot_cache' set to false """ s = _get_client(init_slot_cache=False, **kwargs) assert s.connection_pool.nodes.slots == {} @@ -94,3 +107,20 @@ def t(request, *args, **kwargs): Create a regular StrictRedis object instance """ return StrictRedis(*args, **kwargs) + + +@pytest.fixture() +def sr(request, *args, **kwargs): + """ + Returns a instance of StrictRedisCluster + """ + return _init_client(request, cls=StrictRedisCluster, **kwargs) + + +@pytest.fixture() +def rcm(request, *args, **kwargs): + """ + Returns a instance of RedisClusterMgt + """ + return _init_mgt_client(request, cls=RedisClusterMgt, decode_responses=True, + **kwargs) diff --git a/tests/test_cluster_obj.py b/tests/test_cluster_obj.py index 29441524..0dd9fa29 100644 --- a/tests/test_cluster_obj.py +++ b/tests/test_cluster_obj.py @@ -5,7 +5,7 @@ import re # rediscluster imports -from rediscluster import RedisCluster +from rediscluster import StrictRedisCluster from rediscluster.connection import ClusterConnectionPool from rediscluster.exceptions import RedisClusterException from rediscluster.nodemanager import NodeManager @@ -22,7 +22,7 @@ def test_representation(r): - assert re.search('^RedisCluster<[0-9\.\:\,].+>$', str(r)) + assert re.search('^StrictRedisCluster<[0-9\.\:\,].+>$', str(r)) def test_blocked_strict_redis_args(): @@ -30,7 +30,7 @@ def test_blocked_strict_redis_args(): Some arguments should explicitly be blocked because they will not work in a cluster setup """ params = {'startup_nodes': [{'host': '127.0.0.1', 'port': 7000}]} - c = RedisCluster(**params) + c = StrictRedisCluster(**params) assert c.connection_pool.connection_kwargs["socket_timeout"] == ClusterConnectionPool.RedisClusterDefaultTimeout with pytest.raises(RedisClusterException) as ex: @@ -44,7 +44,7 @@ def test_host_port_startup_node(): """ h = "192.168.0.1" p = 7000 - c = RedisCluster(host=h, port=p, init_slot_cache=False) + c = StrictRedisCluster(host=h, port=p, init_slot_cache=False) assert {"host": h, "port": p} in c.connection_pool.nodes.startup_nodes @@ -84,7 +84,7 @@ def test_blocked_transaction(r): """ with pytest.raises(RedisClusterException) as ex: r.transaction(None) - assert unicode(ex.value).startswith("method RedisCluster.transaction() is not implemented"), unicode(ex.value) + assert unicode(ex.value).startswith("method StrictRedisCluster.transaction() is not implemented"), unicode(ex.value) def test_cluster_of_one_instance(): @@ -93,10 +93,10 @@ def test_cluster_of_one_instance(): one server. There is another redis server joining the cluster, hold slot 0, and - eventually quit the cluster. The RedisCluster instance may get confused + eventually quit the cluster. The StrictRedisCluster instance may get confused when slots mapping and nodes change during the test. """ - with patch.object(RedisCluster, 'parse_response') as parse_response_mock: + with patch.object(StrictRedisCluster, 'parse_response') as parse_response_mock: with patch.object(NodeManager, 'initialize', autospec=True) as init_mock: def side_effect(self, *args, **kwargs): def ok_call(self, *args, **kwargs): @@ -141,7 +141,7 @@ def map_7007(self): parse_response_mock.side_effect = side_effect init_mock.side_effect = side_effect_rebuild_slots_cache - rc = RedisCluster(host='127.0.0.1', port=7006) + rc = StrictRedisCluster(host='127.0.0.1', port=7006) rc.set("foo", "bar") ##### @@ -150,7 +150,7 @@ def map_7007(self): parse_response_mock.side_effect = side_effect init_mock.side_effect = side_effect_rebuild_slots_cache - rc = RedisCluster(host='127.0.0.1', port=7006) + rc = StrictRedisCluster(host='127.0.0.1', port=7006) p = rc.pipeline() p.set("bar", "foo") p.execute() @@ -204,7 +204,7 @@ def test_clusterdown_exception_handling(): """ with patch.object(ClusterConnectionPool, 'disconnect') as mock_disconnect: with patch.object(ClusterConnectionPool, 'reset') as mock_reset: - r = RedisCluster(host="127.0.0.1", port=7000) + r = StrictRedisCluster(host="127.0.0.1", port=7000) i = len(mock_reset.mock_calls) assert r.handle_cluster_command_exception(Exception("CLUSTERDOWN")) == {"method": "clusterdown"} @@ -239,7 +239,7 @@ def test_refresh_table_asap(): with patch.object(NodeManager, 'initialize') as mock_initialize: mock_initialize.return_value = None - r = RedisCluster(host="127.0.0.1", port=7000) + r = StrictRedisCluster(host="127.0.0.1", port=7000) r.connection_pool.nodes.slots[12182] = { "host": "127.0.0.1", "port": 7002, @@ -263,7 +263,7 @@ def test_ask_redirection(): Important thing to verify is that it tries to talk to the second node. """ - r = RedisCluster(host="127.0.0.1", port=7000) + r = StrictRedisCluster(host="127.0.0.1", port=7000) m = Mock(autospec=True) @@ -293,7 +293,7 @@ def test_ask_redirection_pipeline(): Important thing to verify is that it tries to talk to the second node. """ - r = RedisCluster(host="127.0.0.1", port=7000) + r = StrictRedisCluster(host="127.0.0.1", port=7000) p = r.pipeline() m = Mock(autospec=True) @@ -325,7 +325,7 @@ def test_moved_redirection(): Important thing to verify is that it tries to talk to the second node. """ - r = RedisCluster(host="127.0.0.1", port=7000) + r = StrictRedisCluster(host="127.0.0.1", port=7000) m = Mock(autospec=True) def ask_redirect_effect(connection, command_name, **options): @@ -354,7 +354,7 @@ def test_moved_redirection_pipeline(): Important thing to verify is that it tries to talk to the second node. """ - r = RedisCluster(host="127.0.0.1", port=7000) + r = StrictRedisCluster(host="127.0.0.1", port=7000) p = r.pipeline() m = Mock(autospec=True) diff --git a/tests/test_commands.py b/tests/test_commands.py index 69545d64..e29aa58d 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1229,39 +1229,39 @@ def test_sort_all_options(self, r): class TestStrictCommands(object): - def test_strict_zadd(self, r): - r.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) - assert r.zrange('a', 0, -1, withscores=True) == \ + def test_strict_zadd(self, sr): + sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) + assert sr.zrange('a', 0, -1, withscores=True) == \ [(b('a1'), 1.0), (b('a2'), 2.0), (b('a3'), 3.0)] - def test_strict_lrem(self, r): - r.rpush('a', 'a1', 'a2', 'a3', 'a1') - r.lrem('a', 0, 'a1') - assert r.lrange('a', 0, -1) == [b('a2'), b('a3')] - - def test_strict_setex(self, r): - assert r.setex('a', 60, '1') - assert r['a'] == b('1') - assert 0 < r.ttl('a') <= 60 - - def test_strict_ttl(self, r): - assert not r.expire('a', 10) - r['a'] = '1' - assert r.expire('a', 10) - assert 0 < r.ttl('a') <= 10 - assert r.persist('a') - assert r.ttl('a') == -1 - - def test_strict_pttl(self, r): - assert not r.pexpire('a', 10000) - r['a'] = '1' - assert r.pexpire('a', 10000) - assert 0 < r.pttl('a') <= 10000 - assert r.persist('a') - assert r.pttl('a') == -1 - - def test_eval(self, r): - res = r.eval("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}", 2, "A{foo}", "B{foo}", "first", "second") + def test_strict_lrem(self, sr): + sr.rpush('a', 'a1', 'a2', 'a3', 'a1') + sr.lrem('a', 0, 'a1') + assert sr.lrange('a', 0, -1) == [b('a2'), b('a3')] + + def test_strict_setex(self, sr): + assert sr.setex('a', 60, '1') + assert sr['a'] == b('1') + assert 0 < sr.ttl('a') <= 60 + + def test_strict_ttl(self, sr): + assert not sr.expire('a', 10) + sr['a'] = '1' + assert sr.expire('a', 10) + assert 0 < sr.ttl('a') <= 10 + assert sr.persist('a') + assert sr.ttl('a') == -1 + + def test_strict_pttl(self, sr): + assert not sr.pexpire('a', 10000) + sr['a'] = '1' + assert sr.pexpire('a', 10000) + assert 0 < sr.pttl('a') <= 10000 + assert sr.persist('a') + assert sr.pttl('a') == -1 + + def test_eval(self, sr): + res = sr.eval("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}", 2, "A{foo}", "B{foo}", "first", "second") assert(res[0] == b('A{foo}')) assert(res[1] == b('B{foo}')) assert(res[2] == b('first')) diff --git a/tests/test_node_manager.py b/tests/test_node_manager.py index a90a55bc..87b920d5 100644 --- a/tests/test_node_manager.py +++ b/tests/test_node_manager.py @@ -5,7 +5,7 @@ # rediscluster imports from tests.conftest import skip_if_server_version_lt -from rediscluster import RedisCluster +from rediscluster import StrictRedisCluster from rediscluster.exceptions import RedisClusterException from rediscluster.nodemanager import NodeManager @@ -147,7 +147,7 @@ def monkey_link(host=None, port=None, decode_responses=False): else: result = [] - r = RedisCluster(host=host, port=port, decode_responses=True) + r = StrictRedisCluster(host=host, port=port, decode_responses=True) orig_execute_command = r.execute_command def execute_command(*args, **kwargs): @@ -239,7 +239,7 @@ def test_cluster_slots_error(): Check that exception is raised if initialize can't execute 'CLUSTER SLOTS' command. """ - with patch.object(RedisCluster, 'execute_command') as execute_command_mock: + with patch.object(StrictRedisCluster, 'execute_command') as execute_command_mock: execute_command_mock.side_effect = Exception("foobar") n = NodeManager(startup_nodes=[{}]) @@ -311,6 +311,7 @@ def test_cluster_one_instance(): def test_init_with_down_node(): """ If I can't connect to one of the nodes, everything should still work. + But if I can't connect to any of the nodes, exception should be thrown. """ def get_redis_link(host, port, decode_responses=False): if port == 7000: @@ -326,4 +327,4 @@ def get_redis_link(host, port, decode_responses=False): n = NodeManager(startup_nodes=[{"host": "127.0.0.1", "port": 7000}]) with pytest.raises(RedisClusterException) as e: n.initialize() - assert 'All slots are not covered' in unicode(e.value) + assert 'Redis Cluster cannot be connected' in unicode(e.value) diff --git a/tests/test_redis_cluster_mgt.py b/tests/test_redis_cluster_mgt.py new file mode 100644 index 00000000..a50e6a68 --- /dev/null +++ b/tests/test_redis_cluster_mgt.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- + + +class TestRedisClusterMgt(object): + + def test_info(self, rcm): + info = rcm.info() + assert 'cluster_state' in info + + def test_slots(self, rcm): + slots = rcm.slots() + assert 'master' in slots + assert 'slave' in slots + + master_slots = slots['master'] + for host, slots in master_slots.items(): + s = slots[0] + # node can have multiple slots + # as a result, the format is [[1, 2], [3, 4]] + assert isinstance(s, list) + assert len(s) == 2 + + def test_nodes(self, rcm): + nodes = rcm.nodes() + for host, info in nodes.items(): + assert 'role' in info + assert 'slots' in info diff --git a/tox.ini b/tox.ini index af622639..9addb9d8 100644 --- a/tox.ini +++ b/tox.ini @@ -54,10 +54,10 @@ deps = basepython= python3.4 deps = flake8==2.2.5 -commands = flake8 --show-source --exclude=.venv,.tox,dist,docs,build,.git --ignore=E501 . +commands = flake8 --show-source --exclude=.venv,.tox,dist,docs,build,.git --ignore=E501,E731,E402 . [testenv:flake8-py27] basepython= python2.7 deps = flake8==2.2.5 -commands = flake8 --show-source --exclude=.venv,.tox,dist,docs,build,.git --ignore=E501 . +commands = flake8 --show-source --exclude=.venv,.tox,dist,docs,build,.git --ignore=E501,E731,E402 .