Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to do grid wide processing per partition? #123

Open
javafanboy opened this issue Apr 24, 2024 · 18 comments
Open

How to do grid wide processing per partition? #123

javafanboy opened this issue Apr 24, 2024 · 18 comments
Labels

Comments

@javafanboy
Copy link

javafanboy commented Apr 24, 2024

The documentation describes quite clearly how to run processors of different kinds for a specific key, for all keys or for each cache node but what about if I want to process the data in each partition separately?

Lacking any "out of the box" solution to do this my current idea is to use an AbstractInvocable that will run on each storage enabled node and then let it find the partitions of the desired cache(s) on that node and do whatever I want to do with them one by one or in parallel (for instance using virtual threads).

I have so far not found how to find the number of partitions on each node (not necessarily perfectly evenly balanced between nodes) or indeed the actual keys/values for each partition :-(

So far I have written this code for my Invocable

InvocationService service = (InvocationService) getService();
cluster = service.getCluster();
Service cacheService = cluster.getService("CacheService");
if (cacheService instanceof CacheService) {
    BackingMapManagerContext context = ((CacheService) cacheService).getBackingMapManager().getContext();
    // ??????
}

Is this the right way to access what I want and if so how do I get further? Cant really find the right methods from here?

@javafanboy javafanboy added the RFA label Apr 24, 2024
@javafanboy javafanboy changed the title How to do grid wide processing per partitioin? How to do grid wide processing per partition? Apr 24, 2024
@thegridman
Copy link
Member

The question would be what you intend to do with the BackingMapManagerContext you have.

You can run entry processors (or aggregators) against specific partitions using a PartitionedFilter see https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/java-reference/com/tangosol/util/filter/PartitionedFilter.html

@javafanboy
Copy link
Author

javafanboy commented Apr 24, 2024

Wow super fast responce "Gridman" - thanks!

Will investigate that direction further!

@thegridman
Copy link
Member

Another approach to target a single partition is to create a class that implements PartitionAwareKey. You can then use that as the key to execute an entry processor. When you execute an EP, the entry does not have to actually exist, so this does not have to be the same as the actual cache keys. This can be useful to target an EP to a single partition, so you can create a PartitionAwareKey with a specific partition id, then from inside the EP, you know your partition and you can access whatever you want (entries, associated caches etc) from the EP.

@javafanboy
Copy link
Author

That also sounds worth looking into. I actually have multiple use-cases on my "radar" so different ones may come in handy...

What is by the way the simplest way to find the configured number of partitions programmatically?

@thegridman
Copy link
Member

To get the partition count programatically you need to be in a cluster member. Then you can get the service for the cache you are interested in, cast it to a PartitionedService and call getPartitionCount(). From a client you don't know the partition count directly, so you could to do it via a simple EntryProcessor call that gets the partition count and returns it. As this EP is only getting the partition count from the context in the entry, you can just invoke it against some fictitious non-existent key if you wanted.

@javafanboy
Copy link
Author

Another approach to target a single partition is to create a class that implements PartitionAwareKey. You can then use that as the key to execute an entry processor. When you execute an EP, the entry does not have to actually exist, so this does not have to be the same as the actual cache keys. This can be useful to target an EP to a single partition, so you can create a PartitionAwareKey with a specific partition id, then from inside the EP, you know your partition and you can access whatever you want (entries, associated caches etc) from the EP.

I created a special cache that weill not contain any data and only will be used to be able to invoke processors using "dummy" partition aware keys (one for each configured partition) and using that I was able to invoke an entry processor once for each partition. In order to do something meaningful I do however need to get to the binary data of my real caches for each partition. Once again I am a bit lost in all the managers, contexts and other Coherence classes I can get to :-(

I assume I should turn the entry passed into the processor (invoked on the non-exisitng partition aware key) into a binary entry (hopefully this is possible also for a at this point non-exisitng entry!) but where to go from there?

@javafanboy javafanboy reopened this Apr 24, 2024
@thegridman
Copy link
Member

Which version of Coherence are you using, because some simpler ways to get related cache entries were added in newer versions.

@javafanboy
Copy link
Author

javafanboy commented Apr 24, 2024 via email

@thegridman
Copy link
Member

thegridman commented Apr 24, 2024

OK, here is a snippet for an entry processor that gets entries from another cache

public class MyProcessor<K, V, R>
        implements InvocableMap.EntryProcessor<K, V, R>
    {
    @Override
    public R process(InvocableMap.Entry<K, V> entry)
        {
        BinaryEntry<K, V> binaryEntry = entry.asBinaryEntry();
        String key = "foo";
        
        // Get the entry for update (the entry will now be locked)
        BinaryEntry<Binary, Object> otherEntry = binaryEntry.getAssociatedEntry("some-other-cache", key);
        
        // Get the entry for reading only (the entry will NOT be locked)
        BinaryEntry<Binary, Object> roEntry = binaryEntry.getAssociatedEntry("some-other-cache", key, true);
        
        return null;
        }
  • You basically, convert the entry to a BinaryEntry with entry.asBinaryEntry() so you can then get the contexts, etc
  • Then to get a related entry in the same partition in another cache call binaryEntry.getAssociatedEntry() which takes a cache name and the non-serialised key.
  • Or if you know you only want to read that entry then get it read only so it is not locked.

One thing to be aware of if you intend to enlist multiple other cache entries and you might have multiple instances of this entry processor executing that might try to enlist the same entries. You need to avoid deadlocks - Coherence will throw an exception if two EPs try to enlist the same entries but in different orders to avoid a deadlock. But to really make sure, it is safest to sort the keys that you will enlist, so all EP instance lock keys in the same order, then they will just wait for each other. For read only entries, this does not matter as they are not locked.

@thegridman
Copy link
Member

I updated the comment above because the new APIs to get a related entry binaryEntry.getAssociatedEntry() take the key as a non-serialised value (whereas the other APIs on the BackingMapContext take Binary keys).

@javafanboy
Copy link
Author

javafanboy commented Apr 24, 2024 via email

@thegridman
Copy link
Member

Unfortunately this is one of those areas that has no documentation apart from the JavaDoc.

@javafanboy
Copy link
Author

javafanboy commented Apr 26, 2024

As for your question about what I need to do in the processor - the project I am first looking at right now is the one I wrote a bit about in the discussion section i.e. to perform an as quick as possible dump / restore of all my partitioned caches to/from AWS S3 objects (one per partition) to enable fast creation of test environments with saved cache content.

Even though the Coherence snapshot feature exists and is possible to extend I am thinking of at least initially just do a quick POC to see how much faster one can dump and more importantly restore the partitioned back maps of our near caches using low level programming compared to the traditional load from database we do today. If this becomes as fast as I hope I can look into if it would make sense to integrate it with Coherence snapshot framework (would be nice but may require too much work).

My thinking for this POC is so far as follows:

  1. To dump create a processor that for each partition processes the binary keys and values of each partitioned cache (the names passed in as arguments to the processor when it is constructed) and streams the data to an S3 object. To separate the data from different caches I could just write a header with cache name and the number of binary entries between each.
  2. To restore basically do the opposite i.e. create a processor that for each (at this stage empty) partition reads back the dumped binary entries from an S3 object into the respective caches "backing map" for that partition.

**From the examples you have shared I now know how to get hold of individual entries but I still do not know how to get hold of:

  1. A read-only map of each caches data in a partition so that I can iterate them and get all the binary entries.
  2. A writable view for each cache in a partition that I can use to insert the restored binary entries.**

I have seen that I from a binary entry can get its backing map (but this method is deprecated so not good to rely on going forward) and this is just for the current cache. As I do not necessarily know any key in the other caches I cant use the "associatedEntry" method as a first step and through that get hold of the other backing maps...

Also for the restore there are not yet any entries in any of the caches and once again I do not in the general case know any keys that belong to any particular partition so I need a method to get hold of a mutable backing map without a key.

My hypotesis is that this should be a lot faster than what we do today where we:

  1. Prepares a new database schema for the test environment, reading one of several pre-prepared snapshot.
  2. Once this is completed loads this data into the back tier of our near-cache using multiple threads but as Java objects that first are created from database table format and then POF serialized, sent over the wire to the back cache.

With the new method we would:

  • Have no need to wait for the database schema to be loaded, i.e. the database snapshot and the corresponding saved cache content can be loaded in parallel.
  • Bypasses both "ORM" (Java object creation from database) & POF serialization.
  • Makes full use of both the network capacity and vCPUs of all the storage enabled nodes by loading from multiple S3 objects in parallel (one for each partition) - should allow a much higher aggregated network speed than loading from our, not that beefy, database server. S3 may be a bit slow to get the "first byte" but once transfer starts it should be possible to, using multiple objects read in parallel saturate the network interface even with not that many cores in each.

@thegridman
Copy link
Member

The biggest issue I can see is that while you are creating or restoring your own "snapshot" what happens if other requests come in to read or mutate data. They will either fail or read inconsistent data. The Coherence snapshot process suspends the cache service while data is being snapshotted or restored, so there can be no requests while this is in progress. Although there is a management API to allow you to suspend and unsuspend a service, you could not use this because once the service is suspended you would not be able to execute your entry processors against it.

@javafanboy
Copy link
Author

javafanboy commented Apr 26, 2024 via email

@javafanboy
Copy link
Author

javafanboy commented Apr 30, 2024

I completed a first naive implementation of first part of the MVP to create a snapshot of a single partitioned cache into S3 - by using a partition filter I sent an EntryProcessor (returning null) to each partition that used the streaming support in S3 to upload data in a buffered fashion (to avoid having to create a possibly large byte array of the content of a partition that if performed in parallel most likely would result in OOM).

As I understand it I need to make one call to cache.invokeAll for each partition or else the processor will only be invoked once on each node with all the entries from all the partitions on that node so right now I spawned off a virtual thread for each partition (I run with 31 right now) that each make a cache.invokeAll call each.

The performance even with this first unoptimized way of uploading the data is great - running on my laptop locally (not even in AWS) I easily bottlenecked by 1Gbit Internet connection (to Ireland from Sweden) with the parallel S3 object uploads and I suspect this will be significantly better in AWS where my VMs / containers will have ~5Gbit EACH and network latency will be a lot lower.

Ideally I would however have liked to instead in parallel send only one processor of some kind to each storage enabled node and have it upload one S3 object for each partition owned by that node for ALL partitioned caches (or perhaps the caches specified in the constructor to the processor) - this way multiple possibly small caches will not each produce a large number of S3 objects but I have not figured out if this is even possible...

Next I need to restore the S3 objects and I am still trying to figure out how to do this - for this I do not necessarily (unless say locking in Coherence requires it) invoke one processor per partition - rather I would like to once again invoke one processor per storage enabled node that in parallel would stream the persisted binary keys/values from the S3 objects corresponding to the owned partitions and insert them directly into the backing maps.

The methods for retrieving backing maps I have found so far seem to be deprecated in favor of methods working with single entries so I have still to figure out how to insert binary entries into an empty cache backing map....

Any suggestions are warmly appreciated!

@javafanboy
Copy link
Author

I continue the prototype implementation using the deprecated method getBackingMap as it is perfect (even seems needed for this use case) in the hope it will not be removed without replacing it with something similar but better:

  1. I need to perform operations in bulk on multiple caches.
  2. I will only run my code when there is no other mutation of the caches going on it is not a problem that no locking is provided (I assume this is the main reason the operation is deprecated). In fact, for my purpose, it is an advantage that no locking is performed as this will improve performance :-)

As aa side note it seems an odd choice that the getAssociatedEntry requires a non-serialized key - this would really slow things down in my case as I right now is operating on only serialized keys and values and having to de-serialize all the keys of the whole cache just to satisfy this method would be unfortunate...

Will by the way take a break from working on this due to vacation travel but after that get back to it again :-)

@mgamanho
Copy link
Contributor

mgamanho commented May 6, 2024

Hi,

are you sure you don't want to just use SnapshotArchiver? It does what you describe... You don't have to wrestle partition ownership and handle entry/key associations for per-member processing, iterate through backing maps or deal with serialization/conversion. Among others. Just give that a shot and see if it meets your SLAs.

Otherwise to do what you describe in an efficient manner would likely require going down to the message level. See how BinaryMap works, it is the client for all distributed cache operations. Be aware that using internals code will expose you to possible changes we will make. For you that means possible reverse engineer sessions and rewrites.

It seems you already have the S3 downstream processing down, you're almost there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants