Skip to content

Commit

Permalink
Version 0.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
merretbuurman committed May 25, 2020
1 parent ce6d185 commit 0482a49
Show file tree
Hide file tree
Showing 84 changed files with 2,125 additions and 511 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

esgf-pid is a python library that is part of the ESGF data publication. It is responsible for triggering PID creation. To do this, it sends JSON messages to a RabbitMQ Messaging Queue.

The library is tested using python 2.7.
The library is tested using python 2.7 and recently using python 3.7.

Up to version 0.7.17, pika==0.11.2 should be used. After that, pika>=1.0.1 should be used.


Access to a RabbitMQ server with the necessary queues is required. The server data has to be provided to the library.

Expand All @@ -11,3 +14,6 @@ Access to a RabbitMQ server with the necessary queues is required. The server da
* Merret Buurman (DKRZ)

* Katharina Berger (DKRZ)

* mauzey1

20 changes: 20 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
esgf-pid
==========

esgf-pid is a python library that is part of the ESGF data publication. It is responsible for triggering PID creation. To do this, it sends JSON messages to a RabbitMQ Messaging Queue.

The library is tested using python 2.7 and recently using python 3.7.

Up to version 0.7.17, pika==0.11.2 should be used. After that, pika>=1.0.1 should be used.

Access to a RabbitMQ server with the necessary queues is required. The server data has to be provided to the library.

Collaborators
-----------------

* Merret Buurman (DKRZ)

* Katharina Berger (DKRZ)

* mauzey1

20 changes: 11 additions & 9 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#


import os
import sys
sys.path.insert(0, os.path.abspath('../..'))
Expand Down Expand Up @@ -55,18 +57,18 @@
master_doc = 'index'

# General information about the project.
project = u'ESGF PID Consumer Tests'
copyright = u'2016, Merret Buurman (DKRZ)'
author = u'DKRZ'
project = 'ESGF PID Consumer Tests'
copyright = '2016, Merret Buurman (DKRZ)'
author = 'DKRZ'

# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = u'0.1'
version = '0.1'
# The full version, including alpha/beta/rc tags.
release = u'0.1'
release = '0.1'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down Expand Up @@ -267,8 +269,8 @@
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'ESGFPIDConsumerTests.tex', u'ESGF PID Consumer Tests Documentation',
u'Merret Buurman (DKRZ)', 'manual'),
(master_doc, 'ESGFPIDConsumerTests.tex', 'ESGF PID Consumer Tests Documentation',
'Merret Buurman (DKRZ)', 'manual'),
]

# The name of an image file (relative to this directory) to place at the top of
Expand Down Expand Up @@ -309,7 +311,7 @@
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'esgfpidconsumertests', u'ESGF PID Consumer Tests Documentation',
(master_doc, 'esgfpidconsumertests', 'ESGF PID Consumer Tests Documentation',
[author], 1)
]

Expand All @@ -324,7 +326,7 @@
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'ESGFPIDConsumerTests', u'ESGF PID Consumer Tests Documentation',
(master_doc, 'ESGFPIDConsumerTests', 'ESGF PID Consumer Tests Documentation',
author, 'ESGFPIDConsumerTests', 'One line description of project.',
'Miscellaneous'),
]
Expand Down
2 changes: 1 addition & 1 deletion esgfpid/assistant/consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __decide_whether_run_check_and_inform(self):
self.__log_no_useful_info()

def __log_solr_error_occured(self, exception):
msg = ('No dataset integrity test could be run. Message: '+exception.message)
msg = ('No dataset integrity test could be run. Message: '+repr(exception))
self.__message_why_not = msg
loginfo(LOGGER, msg)

Expand Down
2 changes: 1 addition & 1 deletion esgfpid/assistant/datacart.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, **args):
'''
@staticmethod
def _get_handle_string_for_datacart(dict_of_drs_ids_and_pids, prefix):
list_of_drs_ids = dict_of_drs_ids_and_pids.keys()
list_of_drs_ids = list(dict_of_drs_ids_and_pids.keys())
hash_basis = esgfpid.utils.make_sorted_lowercase_list_without_hdl(list_of_drs_ids)
return esgfpid.utils.make_handle_from_list_of_strings(hash_basis, prefix, addition='datacart')
# This sorts the list, removes all "hdl:", and makes a hash
Expand Down
2 changes: 1 addition & 1 deletion esgfpid/assistant/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def __remove_duplicates_from_list_of_file_handles(self):
self.__list_of_file_handles = list(set(self.__list_of_file_handles))

def __send_existing_file_messages_to_queue(self):
for i in xrange(0, len(self.__list_of_file_messages)):
for i in range(0, len(self.__list_of_file_messages)):
self.__try_to_send_one_file_message(i)
msg = 'All file publication jobs handed to rabbit thread.'
logdebug(LOGGER, msg)
Expand Down
2 changes: 1 addition & 1 deletion esgfpid/assistant/unpublish.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class AssistantAllVersions(AssistantOneVersion):

def __init__(self, **args):
super(AssistantAllVersions, self).__init__(**args)
if 'consumer_solr_url' not in args.keys():
if 'consumer_solr_url' not in list(args.keys()):
args['consumer_solr_url'] = None
self.__consumer_solr_url = args['consumer_solr_url']

Expand Down
22 changes: 11 additions & 11 deletions esgfpid/check.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@


import pika
import logging
import socket
Expand Down Expand Up @@ -78,7 +80,6 @@ def __fill_all_attributes(self, args):
self.__nodemanager = args['connector']._Connector__coupler._Coupler__rabbit_message_sender._RabbitMessageSender__node_manager
if args['print_to_console'] is not None and args['print_to_console'] == True:
self.__print_errors_to_console = True

if args['print_success_to_console'] is not None and args['print_success_to_console'] == True:
self.__print_success_to_console = True
if args['send_message'] is not None and args['send_message'] == True:
Expand Down Expand Up @@ -202,16 +203,16 @@ def __check_send_print_message(self, channel):

body = 'PLEASE PRINT: Testing pre-flight check...'
self.__loginfo(' .. checking message ...')
res = channel.basic_publish(
exchange=self.__nodemanager.get_exchange_name(),
routing_key=rkey,
body=body,
properties=props,
mandatory=True
)
if res:
try:
res = channel.basic_publish(
exchange=self.__nodemanager.get_exchange_name(),
routing_key=rkey,
body=body,
properties=props,
mandatory=True
)
self.__loginfo(' .. checking message ... ok.')
else:
except pika.exceptions.UnroutableError:
self.__loginfo(' .. checking message ... failed.')
self.__add_error_message_message_fail(rkey)
raise ValueError('Could not send message to messaging service host %s' %
Expand Down Expand Up @@ -341,4 +342,3 @@ def __logwarn(self, msg):
if self.__print_errors_to_console == True:
print(msg)
utils.logwarn(LOGGER, msg)

14 changes: 7 additions & 7 deletions esgfpid/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ def __check_rabbit_credentials_completeness(self, args):
raise esgfpid.exceptions.ArgumentError(errmsg)

# Mandatory:
self.__check_presence_and_type('url', credentials, basestring)
self.__check_presence_and_type('user', credentials, basestring)
self.__check_presence_and_type('password', credentials, basestring) # If you want open nodes to be enabled again, remove this!
self.__check_presence_and_type('url', credentials, str)
self.__check_presence_and_type('user', credentials, str)
self.__check_presence_and_type('password', credentials, str) # If you want open nodes to be enabled again, remove this!

# Optional:
self.__check_and_adapt_type_if_exists('password', credentials, basestring)
self.__check_and_adapt_type_if_exists('vhost', credentials, basestring)
self.__check_and_adapt_type_if_exists('password', credentials, str)
self.__check_and_adapt_type_if_exists('vhost', credentials, str)
self.__check_and_adapt_type_if_exists('port', credentials, int)
self.__check_and_adapt_type_if_exists('ssl_enabled', credentials, bool)

Expand Down Expand Up @@ -253,13 +253,13 @@ def __check_and_adapt_type_if_exists(self, attname, credentials, desiredtype):

def __try_conversion(self, value, desiredtype):
if desiredtype == bool:
if isinstance(value, basestring):
if isinstance(value, str):
if str.lower(value) == 'true':
return True
elif str.lower(value) == 'false':
return False
raise ValueError()
if desiredtype == basestring:
if desiredtype == str:
#return str(value)
raise ValueError('Not transforming booleans')
if desiredtype == int:
Expand Down
4 changes: 3 additions & 1 deletion esgfpid/example_usage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@


import esgfpid
import logging
import datetime
Expand Down Expand Up @@ -46,7 +48,7 @@ def init_logging():
logging.basicConfig(level=LOGLEVEL, filename=filename, filemode='w', format=myformat)
pikalogger = logging.getLogger('pika')
pikalogger.setLevel(logging.WARNING)
print 'Logging to file ".%s"' % filename
print('Logging to file ".%s"' % filename)

def init_connector(data_node):
trusted_node1 = {
Expand Down
26 changes: 15 additions & 11 deletions esgfpid/rabbit/asynchronous/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
'''

import Queue
import sys
if sys.version[0] == '2':
import Queue as queue
else:
import queue as queue
import threading
import pika
import time
Expand Down Expand Up @@ -70,7 +74,7 @@ def __init__(self, node_manager):

# Shared objects
self.__statemachine = StateMachine()
self.__unpublished_messages_queue = Queue.Queue()
self.__unpublished_messages_queue = queue.Queue()

# Log flags
self.__first_message_receival = True
Expand Down Expand Up @@ -173,7 +177,7 @@ def __join_and_rescue(self):
self.__rescue_leftovers()
else:
loginfo(LOGGER, 'Joining the thread failed once... Retrying.')
for i in xrange(20):
for i in range(20):
time.sleep(0.1) # blocking
self.__thread.add_event_force_finish()
success = self.__join(timeout_seconds)
Expand Down Expand Up @@ -398,7 +402,7 @@ def __trigger_one_publish_action(self):
def __trigger_n_publish_actions(self, num_messages_to_publish):
logdebug(LOGGER, 'Asking rabbit thread to publish %i messages...', num_messages_to_publish)
to_be_sure = 10
for i in xrange(num_messages_to_publish+to_be_sure):
for i in range(num_messages_to_publish+to_be_sure):
self.__thread.add_event_publish_message()


Expand Down Expand Up @@ -440,7 +444,7 @@ def __get_nacked_messages_as_list(self):
published once publishing definitely has ended.
The method iterates over the Queue and retrieves objects until
a Queue.Empty event occurs. As this event is not guaranteed
a queue.Empty event occurs. As this event is not guaranteed
to be true, the retrieval is then tried another time with
a little timeout to make sure all elements are retrieved.
Expand All @@ -463,22 +467,22 @@ def __get_unpublished_messages_as_list(self):
self.__get_msg_from_queue_and_store_first_try(
newlist,
self.__unpublished_messages_queue)
except Queue.Empty:
except queue.Empty:
try:
self.__get_a_msg_from_queue_and_store_second_try(
newlist,
self.__unpublished_messages_queue)
except Queue.Empty:
except queue.Empty:
break
return newlist

'''Put a message from the Queue to the list, without waiting.'''
def __get_msg_from_queue_and_store_first_try(self, alist, queue):
msg_incl_routing_key = queue.get(block=False)
def __get_msg_from_queue_and_store_first_try(self, alist, msg_queue):
msg_incl_routing_key = msg_queue.get(block=False)
alist.append(msg_incl_routing_key)

'''Put a message from the Queue to the list, with waiting.'''
def __get_a_msg_from_queue_and_store_second_try(self, alist, queue):
def __get_a_msg_from_queue_and_store_second_try(self, alist, msg_queue):
wait_seconds = 0.5
msg_incl_routing_key = queue.get(block=True, timeout=wait_seconds)
msg_incl_routing_key = msg_queue.get(block=True, timeout=wait_seconds)
alist.append(msg_incl_routing_key)
24 changes: 14 additions & 10 deletions esgfpid/rabbit/asynchronous/rabbitthread.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import Queue
import sys
if sys.version[0] == '2':
import Queue as queue
else:
import queue as queue
import threading
import pika
import time
Expand Down Expand Up @@ -49,7 +53,7 @@
The RabbitThread provides some methods to be called by the main
thread, i.e. by the AsynchronousRabbitConnector, which use pika's
SelectConnection.add_timeout() to pass the event to the thread's
SelectConnection.ioloop.call_later() to pass the event to the thread's
blocking loop.
Only the methods
Expand All @@ -67,7 +71,7 @@
'''
class RabbitThread(threading.Thread):

def __init__(self, statemachine, queue, facade, node_manager):
def __init__(self, statemachine, msg_queue, facade, node_manager):
threading.Thread.__init__(self)

'''
Expand Down Expand Up @@ -107,7 +111,7 @@ def __init__(self, statemachine, queue, facade, node_manager):
will retrieve and publish them.
Shared with the main thread!
'''
self.__unpublished_messages_queue = queue
self.__unpublished_messages_queue = msg_queue

# These are only used by the rabbit thread:
'''
Expand Down Expand Up @@ -158,11 +162,11 @@ def __init__(self, statemachine, queue, facade, node_manager):
The fact that it is possible to increase it requires some precautions.
If messages can be scheduled to be sent to server, but not actually sent
yet makes it difficult to loop over the Queue of unsent messages, using
"while" and "Queue.Empty" to break it.
"while" and "queue.Empty" to break it.
(It will loop for too long and then have to call the publish method many
many times with no more messages to publish.)
If you ever decide to while-loop and break on Queue.Empty, just make sure
If you ever decide to while-loop and break on queue.Empty, just make sure
the publish interval is zero.
'''
self.__PUBLISH_INTERVAL_SECONDS = 0
Expand Down Expand Up @@ -272,7 +276,7 @@ def get_nacked_messages_as_list(self):
'''
def __add_event(self, event):
if self._connection is not None:
self._connection.add_timeout(self.__PUBLISH_INTERVAL_SECONDS, event)
self._connection.ioloop.call_later(self.__PUBLISH_INTERVAL_SECONDS, event)
else:
# If the main thread wants to add an event so quickly after starting the
# thread that not even the connection object is listening for events yet,
Expand All @@ -285,7 +289,7 @@ def __add_event(self, event):
logdebug(LOGGER, 'Main thread wants to add event to thread that is not ready to receive events yet. Blocking and waiting.')
self.__wait_for_thread_to_accept_events()
logdebug(LOGGER, 'Thread declared itself ready to receive events.')
self._connection.add_timeout(self.__PUBLISH_INTERVAL_SECONDS, event)
self._connection.ioloop.call_later(self.__PUBLISH_INTERVAL_SECONDS, event)
logerror(LOGGER, 'Added event after having waited for thread to open.')


Expand Down Expand Up @@ -348,9 +352,9 @@ def get_num_unpublished(self):
def get_num_unconfirmed(self):
return self.__confirmer.get_num_unconfirmed()

''' Called by feeder, to publish a message. May raise Queue.Empty. '''
''' Called by feeder, to publish a message. May raise queue.Empty. '''
def get_message_from_unpublished_stack(self, seconds):
return self.__unpublished_messages_queue.get(block=True, timeout=seconds) # can raise Queue.Empty
return self.__unpublished_messages_queue.get(block=True, timeout=seconds) # can raise queue.Empty

''' Called by feeder, to put a message back that was not successfully published. '''
def put_one_message_into_queue_of_unsent_messages(self, message):
Expand Down
Loading

0 comments on commit 0482a49

Please sign in to comment.