diff --git a/java/org/apache/catalina/tribes/io/BufferPool.java b/java/org/apache/catalina/tribes/io/BufferPool.java index 29e8804f9b77..7e4736ae36ad 100644 --- a/java/org/apache/catalina/tribes/io/BufferPool.java +++ b/java/org/apache/catalina/tribes/io/BufferPool.java @@ -27,8 +27,9 @@ public class BufferPool { private static final Log log = LogFactory.getLog(BufferPool.class); - public static final int DEFAULT_POOL_SIZE = - Integer.getInteger("org.apache.catalina.tribes.io.BufferPool.DEFAULT_POOL_SIZE", 100*1024*1024).intValue(); //100 MiB + public static final int DEFAULT_POOL_SIZE = Integer + .getInteger("org.apache.catalina.tribes.io.BufferPool.DEFAULT_POOL_SIZE", 100 * 1024 * 1024).intValue(); // 100 + // MiB protected static final StringManager sm = StringManager.getManager(BufferPool.class); @@ -40,8 +41,7 @@ public static BufferPool getBufferPool() { if (instance == null) { BufferPool pool = new BufferPool(); pool.setMaxSize(DEFAULT_POOL_SIZE); - log.info(sm.getString("bufferPool.created", - Integer.toString(DEFAULT_POOL_SIZE), + log.info(sm.getString("bufferPool.created", Integer.toString(DEFAULT_POOL_SIZE), pool.getClass().getName())); instance = pool; } @@ -55,12 +55,12 @@ private BufferPool() { public XByteBuffer getBuffer(int minSize, boolean discard) { XByteBuffer buffer = queue.poll(); - if ( buffer != null ) { + if (buffer != null) { size.addAndGet(-buffer.getCapacity()); } - if ( buffer == null ) { - buffer = new XByteBuffer(minSize,discard); - } else if ( buffer.getCapacity() <= minSize ) { + if (buffer == null) { + buffer = new XByteBuffer(minSize, discard); + } else if (buffer.getCapacity() <= minSize) { buffer.expand(minSize); } buffer.setDiscard(discard); @@ -69,7 +69,7 @@ public XByteBuffer getBuffer(int minSize, boolean discard) { } public void returnBuffer(XByteBuffer buffer) { - if ( (size.get() + buffer.getCapacity()) <= maxSize ) { + if ((size.get() + buffer.getCapacity()) <= maxSize) { size.addAndGet(buffer.getCapacity()); queue.offer(buffer); } @@ -82,8 +82,7 @@ public void clear() { protected int maxSize; protected final AtomicInteger size = new AtomicInteger(0); - protected final ConcurrentLinkedQueue queue = - new ConcurrentLinkedQueue<>(); + protected final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); public void setMaxSize(int bytes) { this.maxSize = bytes; diff --git a/java/org/apache/catalina/tribes/io/ChannelData.java b/java/org/apache/catalina/tribes/io/ChannelData.java index 8190f3279875..9f94520dd6b0 100644 --- a/java/org/apache/catalina/tribes/io/ChannelData.java +++ b/java/org/apache/catalina/tribes/io/ChannelData.java @@ -26,10 +26,10 @@ import org.apache.catalina.tribes.util.UUIDGenerator; /** - * The ChannelData object is used to transfer a message through the - * channel interceptor stack and eventually out on a transport to be sent - * to another node. While the message is being processed by the different + * The ChannelData object is used to transfer a message through the channel interceptor stack and + * eventually out on a transport to be sent to another node. While the message is being processed by the different * interceptors, the message data can be manipulated as each interceptor seems appropriate. + * * @author Peter Rossbach */ public class ChannelData implements ChannelMessage { @@ -42,19 +42,19 @@ public class ChannelData implements ChannelMessage { /** * The options this message was sent with */ - private int options = 0 ; + private int options = 0; /** * The message data, stored in a dynamic buffer */ - private XByteBuffer message ; + private XByteBuffer message; /** * The timestamp that goes with this message */ - private long timestamp ; + private long timestamp; /** * A unique message id */ - private byte[] uniqueId ; + private byte[] uniqueId; /** * The source or reply-to address for this message */ @@ -62,6 +62,7 @@ public class ChannelData implements ChannelMessage { /** * Creates an empty channel data with a new unique Id + * * @see #ChannelData(boolean) */ public ChannelData() { @@ -70,10 +71,11 @@ public ChannelData() { /** * Create an empty channel data object + * * @param generateUUID boolean - if true, a unique Id will be generated */ public ChannelData(boolean generateUUID) { - if ( generateUUID ) { + if (generateUUID) { generateUUID(); } } @@ -81,8 +83,9 @@ public ChannelData(boolean generateUUID) { /** * Creates a new channel data object with data - * @param uniqueId - unique message id - * @param message - message data + * + * @param uniqueId - unique message id + * @param message - message data * @param timestamp - message timestamp */ public ChannelData(byte[] uniqueId, XByteBuffer message, long timestamp) { @@ -145,127 +148,131 @@ public void setAddress(Member address) { */ public void generateUUID() { byte[] data = new byte[16]; - UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID,data,0); + UUIDGenerator.randomUUID(USE_SECURE_RANDOM_FOR_UUID, data, 0); setUniqueId(data); } public int getDataPackageLength() { - int length = - 4 + //options - 8 + //timestamp off=4 - 4 + //unique id length off=12 - uniqueId.length+ //id data off=12+uniqueId.length - 4 + //addr length off=12+uniqueId.length+4 - address.getDataLength()+ //member data off=12+uniqueId.length+4+add.length - 4 + //message length off=12+uniqueId.length+4+add.length+4 - message.getLength(); + int length = 4 + // options + 8 + // timestamp off=4 + 4 + // unique id length off=12 + uniqueId.length + // id data off=12+uniqueId.length + 4 + // addr length off=12+uniqueId.length+4 + address.getDataLength() + // member data off=12+uniqueId.length+4+add.length + 4 + // message length off=12+uniqueId.length+4+add.length+4 + message.getLength(); return length; } /** * Serializes the ChannelData object into a byte[] array + * * @return byte[] */ - public byte[] getDataPackage() { + public byte[] getDataPackage() { int length = getDataPackageLength(); byte[] data = new byte[length]; int offset = 0; - return getDataPackage(data,offset); + return getDataPackage(data, offset); } - public byte[] getDataPackage(byte[] data, int offset) { + public byte[] getDataPackage(byte[] data, int offset) { byte[] addr = address.getData(false); - XByteBuffer.toBytes(options,data,offset); - offset += 4; //options - XByteBuffer.toBytes(timestamp,data,offset); - offset += 8; //timestamp - XByteBuffer.toBytes(uniqueId.length,data,offset); - offset += 4; //uniqueId.length - System.arraycopy(uniqueId,0,data,offset,uniqueId.length); - offset += uniqueId.length; //uniqueId data - XByteBuffer.toBytes(addr.length,data,offset); - offset += 4; //addr.length - System.arraycopy(addr,0,data,offset,addr.length); - offset += addr.length; //addr data - XByteBuffer.toBytes(message.getLength(),data,offset); - offset += 4; //message.length - System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength()); + XByteBuffer.toBytes(options, data, offset); + offset += 4; // options + XByteBuffer.toBytes(timestamp, data, offset); + offset += 8; // timestamp + XByteBuffer.toBytes(uniqueId.length, data, offset); + offset += 4; // uniqueId.length + System.arraycopy(uniqueId, 0, data, offset, uniqueId.length); + offset += uniqueId.length; // uniqueId data + XByteBuffer.toBytes(addr.length, data, offset); + offset += 4; // addr.length + System.arraycopy(addr, 0, data, offset, addr.length); + offset += addr.length; // addr data + XByteBuffer.toBytes(message.getLength(), data, offset); + offset += 4; // message.length + System.arraycopy(message.getBytesDirect(), 0, data, offset, message.getLength()); return data; } /** * Deserializes a ChannelData object from a byte array + * * @param xbuf byte[] + * * @return ChannelData */ - public static ChannelData getDataFromPackage(XByteBuffer xbuf) { + public static ChannelData getDataFromPackage(XByteBuffer xbuf) { ChannelData data = new ChannelData(false); int offset = 0; - data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset)); - offset += 4; //options - data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset)); - offset += 8; //timestamp - data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)]; - offset += 4; //uniqueId length - System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length); - offset += data.uniqueId.length; //uniqueId data - //byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)]; - int addrlen = XByteBuffer.toInt(xbuf.getBytesDirect(),offset); - offset += 4; //addr length - //System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length); - data.setAddress(MemberImpl.getMember(xbuf.getBytesDirect(),offset,addrlen)); - //offset += addr.length; //addr data + data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(), offset)); + offset += 4; // options + data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(), offset)); + offset += 8; // timestamp + data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(), offset)]; + offset += 4; // uniqueId length + System.arraycopy(xbuf.getBytesDirect(), offset, data.uniqueId, 0, data.uniqueId.length); + offset += data.uniqueId.length; // uniqueId data + // byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)]; + int addrlen = XByteBuffer.toInt(xbuf.getBytesDirect(), offset); + offset += 4; // addr length + // System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length); + data.setAddress(MemberImpl.getMember(xbuf.getBytesDirect(), offset, addrlen)); + // offset += addr.length; //addr data offset += addrlen; - int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset); - offset += 4; //xsize length - System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize); + int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(), offset); + offset += 4; // xsize length + System.arraycopy(xbuf.getBytesDirect(), offset, xbuf.getBytesDirect(), 0, xsize); xbuf.setLength(xsize); data.message = xbuf; return data; } - public static ChannelData getDataFromPackage(byte[] b) { + public static ChannelData getDataFromPackage(byte[] b) { ChannelData data = new ChannelData(false); int offset = 0; - data.setOptions(XByteBuffer.toInt(b,offset)); - offset += 4; //options - data.setTimestamp(XByteBuffer.toLong(b,offset)); - offset += 8; //timestamp - data.uniqueId = new byte[XByteBuffer.toInt(b,offset)]; - offset += 4; //uniqueId length - System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length); - offset += data.uniqueId.length; //uniqueId data - byte[] addr = new byte[XByteBuffer.toInt(b,offset)]; - offset += 4; //addr length - System.arraycopy(b,offset,addr,0,addr.length); + data.setOptions(XByteBuffer.toInt(b, offset)); + offset += 4; // options + data.setTimestamp(XByteBuffer.toLong(b, offset)); + offset += 8; // timestamp + data.uniqueId = new byte[XByteBuffer.toInt(b, offset)]; + offset += 4; // uniqueId length + System.arraycopy(b, offset, data.uniqueId, 0, data.uniqueId.length); + offset += data.uniqueId.length; // uniqueId data + byte[] addr = new byte[XByteBuffer.toInt(b, offset)]; + offset += 4; // addr length + System.arraycopy(b, offset, addr, 0, addr.length); data.setAddress(MemberImpl.getMember(addr)); - offset += addr.length; //addr data - int xsize = XByteBuffer.toInt(b,offset); - //data.message = new XByteBuffer(new byte[xsize],false); - data.message = BufferPool.getBufferPool().getBuffer(xsize,false); - offset += 4; //message length - System.arraycopy(b,offset,data.message.getBytesDirect(),0,xsize); - data.message.append(b,offset,xsize); - offset += xsize; //message data + offset += addr.length; // addr data + int xsize = XByteBuffer.toInt(b, offset); + // data.message = new XByteBuffer(new byte[xsize],false); + data.message = BufferPool.getBufferPool().getBuffer(xsize, false); + offset += 4; // message length + System.arraycopy(b, offset, data.message.getBytesDirect(), 0, xsize); + data.message.append(b, offset, xsize); + offset += xsize; // message data return data; } @Override public int hashCode() { - return XByteBuffer.toInt(getUniqueId(),0); + return XByteBuffer.toInt(getUniqueId(), 0); } /** * Compares to ChannelData objects, only compares on getUniqueId().equals(o.getUniqueId()) + * * @param o Object + * * @return boolean */ @Override public boolean equals(Object o) { - if ( o instanceof ChannelData ) { - return Arrays.equals(getUniqueId(),((ChannelData)o).getUniqueId()); + if (o instanceof ChannelData) { + return Arrays.equals(getUniqueId(), ((ChannelData) o).getUniqueId()); } else { return false; } @@ -273,6 +280,7 @@ public boolean equals(Object o) { /** * Create a shallow clone, only the data gets recreated + * * @return ClusterData */ @Override @@ -285,7 +293,7 @@ public ChannelData clone() { throw new AssertionError(); } if (this.message != null) { - clone.message = new XByteBuffer(this.message.getBytesDirect(),false); + clone.message = new XByteBuffer(this.message.getBytesDirect(), false); } return clone; } @@ -297,30 +305,36 @@ public Object deepclone() { } /** - * Utility method, returns true if the options flag indicates that an ack - * is to be sent after the message has been received and processed + * Utility method, returns true if the options flag indicates that an ack is to be sent after the message has been + * received and processed + * * @param options int - the options for the message + * * @return boolean + * * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK */ public static boolean sendAckSync(int options) { - return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) && - ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + return ((Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) && + ((Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) == Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); } /** - * Utility method, returns true if the options flag indicates that an ack - * is to be sent after the message has been received but not yet processed + * Utility method, returns true if the options flag indicates that an ack is to be sent after the message has been + * received but not yet processed + * * @param options int - the options for the message + * * @return boolean + * * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_USE_ACK * @see org.apache.catalina.tribes.Channel#SEND_OPTIONS_SYNCHRONIZED_ACK */ public static boolean sendAckAsync(int options) { - return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) && - ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + return ((Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) && + ((Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); } @Override @@ -334,9 +348,9 @@ public String toString() { } public static String bToS(byte[] data) { - StringBuilder buf = new StringBuilder(4*16); + StringBuilder buf = new StringBuilder(4 * 16); buf.append('{'); - for (int i=0; data!=null && iXByteBuffer until a full package has been received. - * This object uses an XByteBuffer which is an extendable object buffer that also allows - * for message encoding and decoding. + * The object reader object is an object used in conjunction with java.nio TCP messages. This object stores the message + * bytes in a XByteBuffer until a full package has been received. This object uses an XByteBuffer which is + * an extendable object buffer that also allows for message encoding and decoding. */ public class ObjectReader { @@ -51,8 +48,10 @@ public class ObjectReader { public ObjectReader(int packetSize) { this.buffer = new XByteBuffer(packetSize, true); } + /** * Creates an ObjectReader for a TCP NIO socket channel + * * @param channel - the channel to be read. */ public ObjectReader(SocketChannel channel) { @@ -61,13 +60,14 @@ public ObjectReader(SocketChannel channel) { /** * Creates an ObjectReader for a TCP socket + * * @param socket Socket */ public ObjectReader(Socket socket) { - try{ + try { this.buffer = new XByteBuffer(socket.getReceiveBufferSize(), true); - }catch ( IOException x ) { - //unable to get buffer size + } catch (IOException x) { + // unable to get buffer size log.warn(sm.getString("objectReader.retrieveFailed.socketReceiverBufferSize", Integer.toString(Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE))); this.buffer = new XByteBuffer(Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE, true); @@ -90,33 +90,35 @@ public synchronized boolean isAccessed() { /** * Append new bytes to buffer. + * * @see XByteBuffer#countPackages() - * @param data new transfer buffer - * @param len length in buffer + * + * @param data new transfer buffer + * @param len length in buffer * @param count whether to return the count + * * @return number of messages that was sent to callback (or -1 if count == false) */ public int append(ByteBuffer data, int len, boolean count) { - buffer.append(data,len); - int pkgCnt = -1; - if ( count ) { - pkgCnt = buffer.countPackages(); + buffer.append(data, len); + int pkgCnt = -1; + if (count) { + pkgCnt = buffer.countPackages(); + } + return pkgCnt; } - return pkgCnt; - } - public int append(byte[] data,int off,int len, boolean count) { - buffer.append(data,off,len); + public int append(byte[] data, int off, int len, boolean count) { + buffer.append(data, off, len); int pkgCnt = -1; - if ( count ) { + if (count) { pkgCnt = buffer.countPackages(); } return pkgCnt; } /** - * Send buffer to cluster listener (callback). - * Is message complete receiver send message to callback? + * Send buffer to cluster listener (callback). Is message complete receiver send message to callback? * * @see org.apache.catalina.tribes.transport.ReceiverBase#messageDataReceived(ChannelMessage) * @see XByteBuffer#doesPackageExist() @@ -127,7 +129,7 @@ public int append(byte[] data,int off,int len, boolean count) { public ChannelMessage[] execute() { int pkgCnt = buffer.countPackages(); ChannelMessage[] result = new ChannelMessage[pkgCnt]; - for (int i=0; i0; + return buffer.countPackages(true) > 0; } + /** * Returns the number of packages that the reader has read + * * @return int */ public int count() { diff --git a/java/org/apache/catalina/tribes/io/ReplicationStream.java b/java/org/apache/catalina/tribes/io/ReplicationStream.java index 0cfeb6519967..be163196b2f1 100644 --- a/java/org/apache/catalina/tribes/io/ReplicationStream.java +++ b/java/org/apache/catalina/tribes/io/ReplicationStream.java @@ -26,9 +26,8 @@ import org.apache.catalina.tribes.util.StringManager; /** - * Custom subclass of ObjectInputStream that loads from the - * class loader for this web application. This allows classes defined only - * with the web application to be found correctly. + * Custom subclass of ObjectInputStream that loads from the class loader for this web application. This + * allows classes defined only with the web application to be found correctly. * * @author Craig R. McClanahan * @author Bip Thelin @@ -45,31 +44,28 @@ public final class ReplicationStream extends ObjectInputStream { /** * Construct a new instance of CustomObjectInputStream * - * @param stream The input stream we will read from + * @param stream The input stream we will read from * @param classLoaders The class loader array used to instantiate objects * * @exception IOException if an input/output error occurs */ - public ReplicationStream(InputStream stream, - ClassLoader[] classLoaders) - throws IOException { + public ReplicationStream(InputStream stream, ClassLoader[] classLoaders) throws IOException { super(stream); this.classLoaders = classLoaders; } /** - * Load the local class equivalent of the specified stream class - * description, by using the class loader assigned to this Context. + * Load the local class equivalent of the specified stream class description, by using the class loader assigned to + * this Context. * * @param classDesc Class description from the input stream * * @exception ClassNotFoundException if this class cannot be found - * @exception IOException if an input/output error occurs + * @exception IOException if an input/output error occurs */ @Override - public Class resolveClass(ObjectStreamClass classDesc) - throws ClassNotFoundException, IOException { + public Class resolveClass(ObjectStreamClass classDesc) throws ClassNotFoundException, IOException { String name = classDesc.getName(); try { return resolveClass(name); @@ -81,7 +77,7 @@ public Class resolveClass(ObjectStreamClass classDesc) public Class resolveClass(String name) throws ClassNotFoundException { boolean tryRepFirst = name.startsWith("org.apache.catalina.tribes"); - try { + try { if (tryRepFirst) { return findReplicationClass(name); } else { @@ -97,12 +93,11 @@ public Class resolveClass(String name) throws ClassNotFoundException { } /** - * ObjectInputStream.resolveProxyClass has some funky way of using - * the incorrect class loader to resolve proxy classes, let's do it our way instead + * ObjectInputStream.resolveProxyClass has some funky way of using the incorrect class loader to resolve proxy + * classes, let's do it our way instead */ @Override - protected Class resolveProxyClass(String[] interfaces) - throws IOException, ClassNotFoundException { + protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException { ClassLoader latestLoader; if (classLoaders.length > 0) { @@ -117,14 +112,13 @@ protected Class resolveProxyClass(String[] interfaces) Class[] classObjs = new Class[interfaces.length]; for (int i = 0; i < interfaces.length; i++) { Class cl = this.resolveClass(interfaces[i]); - if (latestLoader==null) { + if (latestLoader == null) { latestLoader = cl.getClassLoader(); } if ((cl.getModifiers() & Modifier.PUBLIC) == 0) { if (hasNonPublicInterface) { if (nonPublicLoader != cl.getClassLoader()) { - throw new IllegalAccessError( - sm.getString("replicationStream.conflict")); + throw new IllegalAccessError(sm.getString("replicationStream.conflict")); } } else { nonPublicLoader = cl.getClassLoader(); @@ -136,8 +130,8 @@ protected Class resolveProxyClass(String[] interfaces) try { // No way to avoid this at the moment @SuppressWarnings("deprecation") - Class proxyClass = Proxy.getProxyClass(hasNonPublicInterface ? nonPublicLoader - : latestLoader, classObjs); + Class proxyClass = + Proxy.getProxyClass(hasNonPublicInterface ? nonPublicLoader : latestLoader, classObjs); return proxyClass; } catch (IllegalArgumentException e) { throw new ClassNotFoundException(null, e); @@ -145,13 +139,12 @@ protected Class resolveProxyClass(String[] interfaces) } - public Class findReplicationClass(String name) - throws ClassNotFoundException { + public Class findReplicationClass(String name) throws ClassNotFoundException { Class clazz = Class.forName(name, false, getClass().getClassLoader()); return clazz; } - public Class findExternalClass(String name) throws ClassNotFoundException { + public Class findExternalClass(String name) throws ClassNotFoundException { ClassNotFoundException cnfe = null; for (ClassLoader classLoader : classLoaders) { try { @@ -161,7 +154,7 @@ public Class findExternalClass(String name) throws ClassNotFoundException { cnfe = x; } } - if ( cnfe != null ) { + if (cnfe != null) { throw cnfe; } else { throw new ClassNotFoundException(name); @@ -169,7 +162,7 @@ public Class findExternalClass(String name) throws ClassNotFoundException { } @Override - public void close() throws IOException { + public void close() throws IOException { this.classLoaders = null; super.close(); } diff --git a/java/org/apache/catalina/tribes/io/XByteBuffer.java b/java/org/apache/catalina/tribes/io/XByteBuffer.java index 00bd3b43fdcd..b3bf65731030 100644 --- a/java/org/apache/catalina/tribes/io/XByteBuffer.java +++ b/java/org/apache/catalina/tribes/io/XByteBuffer.java @@ -31,19 +31,17 @@ import org.apache.juli.logging.LogFactory; /** - * The XByteBuffer provides a dual functionality. - * One, it stores message bytes and automatically extends the byte buffer if needed.
- * Two, it can encode and decode packages so that they can be defined and identified - * as they come in on a socket. - *
+ * The XByteBuffer provides a dual functionality. One, it stores message bytes and automatically extends the byte buffer + * if needed.
+ * Two, it can encode and decode packages so that they can be defined and identified as they come in on a socket.
* THIS CLASS IS NOT THREAD SAFE
*
* Transfer package: *
    *
  • START_DATA- 7 bytes - FLT2002
  • - *
  • SIZE - 4 bytes - size of the data package
  • - *
  • DATA - should be as many bytes as the prev SIZE
  • - *
  • END_DATA - 7 bytes - TLF2003
  • + *
  • SIZE - 4 bytes - size of the data package
  • + *
  • DATA - should be as many bytes as the prev SIZE
  • + *
  • END_DATA - 7 bytes - TLF2003
  • *
*/ public class XByteBuffer implements Serializable { @@ -56,12 +54,12 @@ public class XByteBuffer implements Serializable { /** * This is a package header, 7 bytes (FLT2002) */ - private static final byte[] START_DATA = {70,76,84,50,48,48,50}; + private static final byte[] START_DATA = { 70, 76, 84, 50, 48, 48, 50 }; /** * This is the package footer, 7 bytes (TLF2003) */ - private static final byte[] END_DATA = {84,76,70,50,48,48,51}; + private static final byte[] END_DATA = { 84, 76, 70, 50, 48, 48, 51 }; /** * Variable to hold the data @@ -74,18 +72,16 @@ public class XByteBuffer implements Serializable { protected int bufSize = 0; /** - * Flag for discarding invalid packages - * If this flag is set to true, and append(byte[],...) is called, - * the data added will be inspected, and if it doesn't start with - * START_DATA it will be thrown away. - * + * Flag for discarding invalid packages If this flag is set to true, and append(byte[],...) is called, the data + * added will be inspected, and if it doesn't start with START_DATA it will be thrown away. */ protected boolean discard = true; /** * Constructs a new XByteBuffer.
* TODO use a pool of byte[] for performance - * @param size the initial size of the byte buffer + * + * @param size the initial size of the byte buffer * @param discard Flag for discarding invalid packages */ public XByteBuffer(int size, boolean discard) { @@ -93,14 +89,14 @@ public XByteBuffer(int size, boolean discard) { this.discard = discard; } - public XByteBuffer(byte[] data,boolean discard) { - this(data,data.length+128,discard); + public XByteBuffer(byte[] data, boolean discard) { + this(data, data.length + 128, discard); } - public XByteBuffer(byte[] data, int size,boolean discard) { - int length = Math.max(data.length,size); + public XByteBuffer(byte[] data, int size, boolean discard) { + int length = Math.max(data.length, size); buf = new byte[length]; - System.arraycopy(data,0,buf,0,data.length); + System.arraycopy(data, 0, buf, 0, data.length); bufSize = data.length; this.discard = discard; } @@ -110,16 +106,16 @@ public int getLength() { } public void setLength(int size) { - if ( size > buf.length ) { + if (size > buf.length) { throw new ArrayIndexOutOfBoundsException(sm.getString("xByteBuffer.size.larger.buffer")); } bufSize = size; } public void trim(int length) { - if ( (bufSize - length) < 0 ) { - throw new ArrayIndexOutOfBoundsException(sm.getString("xByteBuffer.unableTrim", - Integer.toString(bufSize), Integer.toString(length))); + if ((bufSize - length) < 0) { + throw new ArrayIndexOutOfBoundsException( + sm.getString("xByteBuffer.unableTrim", Integer.toString(bufSize), Integer.toString(length))); } bufSize -= length; } @@ -137,7 +133,7 @@ public byte[] getBytesDirect() { */ public byte[] getBytes() { byte[] b = new byte[bufSize]; - System.arraycopy(buf,0,b,0,bufSize); + System.arraycopy(buf, 0, b, 0, bufSize); return b; } @@ -151,20 +147,23 @@ public void clear() { /** * Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the * header, false will be returned and the data will be discarded. - * @param b - bytes to be appended + * + * @param b - bytes to be appended * @param len - the number of bytes to append. - * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0 + * + * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or + * something, or the length of data is 0 */ public boolean append(ByteBuffer b, int len) { int newcount = bufSize + len; if (newcount > buf.length) { expand(newcount); } - b.get(buf,bufSize,len); + b.get(buf, bufSize, len); bufSize = newcount; - if ( discard ) { + if (discard) { if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) { bufSize = 0; log.error(sm.getString("xByteBuffer.discarded.invalidHeader")); @@ -191,7 +190,7 @@ public boolean append(boolean i) { if (newcount > buf.length) { expand(newcount); } - toBytes(i,buf,bufSize); + toBytes(i, buf, bufSize); bufSize = newcount; return true; } @@ -201,7 +200,7 @@ public boolean append(long i) { if (newcount > buf.length) { expand(newcount); } - toBytes(i,buf,bufSize); + toBytes(i, buf, bufSize); bufSize = newcount; return true; } @@ -211,14 +210,13 @@ public boolean append(int i) { if (newcount > buf.length) { expand(newcount); } - toBytes(i,buf,bufSize); + toBytes(i, buf, bufSize); bufSize = newcount; return true; } public boolean append(byte[] b, int off, int len) { - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return false; @@ -231,7 +229,7 @@ public boolean append(byte[] b, int off, int len) { System.arraycopy(b, off, buf, bufSize, len); bufSize = newcount; - if ( discard ) { + if (discard) { if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) { bufSize = 0; log.error(sm.getString("xByteBuffer.discarded.invalidHeader")); @@ -242,7 +240,7 @@ public boolean append(byte[] b, int off, int len) { } public void expand(int newcount) { - //don't change the allocation strategy + // don't change the allocation strategy byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; System.arraycopy(buf, 0, newbuf, 0, bufSize); buf = newbuf; @@ -254,50 +252,49 @@ public int getCapacity() { /** - * Internal mechanism to make a check if a complete package exists - * within the buffer + * Internal mechanism to make a check if a complete package exists within the buffer + * * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer */ public int countPackages() { return countPackages(false); } - public int countPackages(boolean first) - { + public int countPackages(boolean first) { int cnt = 0; int pos = START_DATA.length; int start = 0; - while ( start < bufSize ) { - //first check start header - int index = firstIndexOf(buf,start,START_DATA); - //if the header (START_DATA) isn't the first thing or - //the buffer isn't even 14 bytes - if ( index != start || ((bufSize-start)<14) ) { + while (start < bufSize) { + // first check start header + int index = firstIndexOf(buf, start, START_DATA); + // if the header (START_DATA) isn't the first thing or + // the buffer isn't even 14 bytes + if (index != start || ((bufSize - start) < 14)) { break; } - //next 4 bytes are compress flag not needed for count packages - //then get the size 4 bytes + // next 4 bytes are compress flag not needed for count packages + // then get the size 4 bytes int size = toInt(buf, pos); - //now the total buffer has to be long enough to hold - //START_DATA.length+4+size+END_DATA.length + // now the total buffer has to be long enough to hold + // START_DATA.length+4+size+END_DATA.length pos = start + START_DATA.length + 4 + size; - if ( (pos + END_DATA.length) > bufSize) { + if ((pos + END_DATA.length) > bufSize) { break; } - //and finally check the footer of the package END_DATA + // and finally check the footer of the package END_DATA int newpos = firstIndexOf(buf, pos, END_DATA); - //mismatch, there is no package + // mismatch, there is no package if (newpos != pos) { break; } - //increase the packet count + // increase the packet count cnt++; - //reset the values + // reset the values start = pos + END_DATA.length; pos = start + START_DATA.length; - //we only want to verify that we have at least one package - if ( first ) { + // we only want to verify that we have at least one package + if (first) { break; } } @@ -306,16 +303,18 @@ public int countPackages(boolean first) /** * Method to check if a package exists in this byte buffer. + * * @return - true if a complete package (header,options,size,data,footer) exists within the buffer */ - public boolean doesPackageExist() { - return (countPackages(true)>0); + public boolean doesPackageExist() { + return (countPackages(true) > 0); } /** - * Extracts the message bytes from a package. - * If no package exists, a IllegalStateException will be thrown. + * Extracts the message bytes from a package. If no package exists, a IllegalStateException will be thrown. + * * @param clearFromBuffer - if true, the package will be removed from the byte buffer + * * @return - returns the actual message bytes (header, compress,size and footer not included). */ public XByteBuffer extractDataPackage(boolean clearFromBuffer) { @@ -324,7 +323,7 @@ public XByteBuffer extractDataPackage(boolean clearFromBuffer) { throw new IllegalStateException(sm.getString("xByteBuffer.no.package")); } int size = toInt(buf, START_DATA.length); - XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false); + XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size, false); xbuf.setLength(size); System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size); if (clearFromBuffer) { @@ -344,21 +343,23 @@ public ChannelData extractPackage(boolean clearFromBuffer) { /** * Creates a complete data package + * * @param cdata - the message data to be contained within the package + * * @return - a full package (header,size,data,footer) */ public static byte[] createDataPackage(ChannelData cdata) { -// return createDataPackage(cdata.getDataPackage()); - //avoid one extra byte array creation + // return createDataPackage(cdata.getDataPackage()); + // avoid one extra byte array creation int dlength = cdata.getDataPackageLength(); int length = getDataPackageLength(dlength); byte[] data = new byte[length]; int offset = 0; System.arraycopy(START_DATA, 0, data, offset, START_DATA.length); offset += START_DATA.length; - toBytes(dlength,data, START_DATA.length); + toBytes(dlength, data, START_DATA.length); offset += 4; - cdata.getDataPackage(data,offset); + cdata.getDataPackage(data, offset); offset += dlength; System.arraycopy(END_DATA, 0, data, offset, END_DATA.length); offset += END_DATA.length; @@ -366,23 +367,22 @@ public static byte[] createDataPackage(ChannelData cdata) { } public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) { - if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) { + if ((buffer.length - bufoff) > getDataPackageLength(dlength)) { throw new ArrayIndexOutOfBoundsException(sm.getString("xByteBuffer.unableCreate")); } System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length); - toBytes(data.length,buffer, bufoff+START_DATA.length); - System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength); - System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length); + toBytes(data.length, buffer, bufoff + START_DATA.length); + System.arraycopy(data, doff, buffer, bufoff + START_DATA.length + 4, dlength); + System.arraycopy(END_DATA, 0, buffer, bufoff + START_DATA.length + 4 + data.length, END_DATA.length); return buffer; } public static int getDataPackageLength(int datalength) { - int length = - START_DATA.length + //header length - 4 + //data length indicator - datalength + //actual data length - END_DATA.length; //footer length + int length = START_DATA.length + // header length + 4 + // data length indicator + datalength + // actual data length + END_DATA.length; // footer length return length; } @@ -390,63 +390,59 @@ public static int getDataPackageLength(int datalength) { public static byte[] createDataPackage(byte[] data) { int length = getDataPackageLength(data.length); byte[] result = new byte[length]; - return createDataPackage(data,0,data.length,result,0); + return createDataPackage(data, 0, data.length, result, 0); } -// public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) { -// int pkglen = getDataPackageLength(dlength); -// if ( buf.getCapacity() < pkglen ) buf.expand(pkglen); -// createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength()); -// } - /** * Convert four bytes to an int - * @param b - the byte array containing the four bytes + * + * @param b - the byte array containing the four bytes * @param off - the offset + * * @return the integer value constructed from the four bytes */ - public static int toInt(byte[] b,int off){ - return ( ( b[off+3]) & 0xFF) + - ( ( ( b[off+2]) & 0xFF) << 8) + - ( ( ( b[off+1]) & 0xFF) << 16) + - ( ( ( b[off+0]) & 0xFF) << 24); + public static int toInt(byte[] b, int off) { + return ((b[off + 3]) & 0xFF) + (((b[off + 2]) & 0xFF) << 8) + (((b[off + 1]) & 0xFF) << 16) + + (((b[off + 0]) & 0xFF) << 24); } /** * Convert eight bytes to a long - * @param b - the byte array containing the four bytes + * + * @param b - the byte array containing the four bytes * @param off - the offset + * * @return the long value constructed from the eight bytes */ - public static long toLong(byte[] b,int off){ - return ( ( (long) b[off+7]) & 0xFF) + - ( ( ( (long) b[off+6]) & 0xFF) << 8) + - ( ( ( (long) b[off+5]) & 0xFF) << 16) + - ( ( ( (long) b[off+4]) & 0xFF) << 24) + - ( ( ( (long) b[off+3]) & 0xFF) << 32) + - ( ( ( (long) b[off+2]) & 0xFF) << 40) + - ( ( ( (long) b[off+1]) & 0xFF) << 48) + - ( ( ( (long) b[off+0]) & 0xFF) << 56); + public static long toLong(byte[] b, int off) { + return (((long) b[off + 7]) & 0xFF) + ((((long) b[off + 6]) & 0xFF) << 8) + + ((((long) b[off + 5]) & 0xFF) << 16) + ((((long) b[off + 4]) & 0xFF) << 24) + + ((((long) b[off + 3]) & 0xFF) << 32) + ((((long) b[off + 2]) & 0xFF) << 40) + + ((((long) b[off + 1]) & 0xFF) << 48) + ((((long) b[off + 0]) & 0xFF) << 56); } /** * Converts a boolean and put it in a byte array. - * @param bool the integer - * @param data the byte buffer in which the boolean will be placed + * + * @param bool the integer + * @param data the byte buffer in which the boolean will be placed * @param offset the offset in the byte array + * * @return the byte array */ public static byte[] toBytes(boolean bool, byte[] data, int offset) { - data[offset] = (byte)(bool?1:0); + data[offset] = (byte) (bool ? 1 : 0); return data; } /** * Converts a byte array entry to boolean. - * @param b byte array + * + * @param b byte array * @param offset within byte array + * * @return true if byte array entry is non-zero, false otherwise */ public static boolean toBoolean(byte[] b, int offset) { @@ -456,56 +452,62 @@ public static boolean toBoolean(byte[] b, int offset) { /** * Converts an integer to four bytes. - * @param n the integer - * @param b the byte buffer in which the integer will be placed + * + * @param n the integer + * @param b the byte buffer in which the integer will be placed * @param offset the offset in the byte array + * * @return four bytes in an array */ public static byte[] toBytes(int n, byte[] b, int offset) { - b[offset+3] = (byte) (n); + b[offset + 3] = (byte) (n); n >>>= 8; - b[offset+2] = (byte) (n); + b[offset + 2] = (byte) (n); n >>>= 8; - b[offset+1] = (byte) (n); + b[offset + 1] = (byte) (n); n >>>= 8; - b[offset+0] = (byte) (n); + b[offset + 0] = (byte) (n); return b; } /** * Converts a long to eight bytes. - * @param n the long - * @param b the byte buffer in which the integer will be placed + * + * @param n the long + * @param b the byte buffer in which the integer will be placed * @param offset the offset in the byte array + * * @return eight bytes in an array */ public static byte[] toBytes(long n, byte[] b, int offset) { - b[offset+7] = (byte) (n); + b[offset + 7] = (byte) (n); n >>>= 8; - b[offset+6] = (byte) (n); + b[offset + 6] = (byte) (n); n >>>= 8; - b[offset+5] = (byte) (n); + b[offset + 5] = (byte) (n); n >>>= 8; - b[offset+4] = (byte) (n); + b[offset + 4] = (byte) (n); n >>>= 8; - b[offset+3] = (byte) (n); + b[offset + 3] = (byte) (n); n >>>= 8; - b[offset+2] = (byte) (n); + b[offset + 2] = (byte) (n); n >>>= 8; - b[offset+1] = (byte) (n); + b[offset + 1] = (byte) (n); n >>>= 8; - b[offset+0] = (byte) (n); + b[offset + 0] = (byte) (n); return b; } /** * Similar to a String.IndexOf, but uses pure bytes. - * @param src - the source bytes to be searched + * + * @param src - the source bytes to be searched * @param srcOff - offset on the source buffer - * @param find - the string to be found within src + * @param find - the string to be found within src + * * @return - the index of the first matching byte. -1 if the find array is not found */ - public static int firstIndexOf(byte[] src, int srcOff, byte[] find){ + public static int firstIndexOf(byte[] src, int srcOff, byte[] find) { int result = -1; if (find.length > src.length) { return result; @@ -513,7 +515,7 @@ public static int firstIndexOf(byte[] src, int srcOff, byte[] find){ if (find.length == 0 || src.length == 0) { return result; } - if (srcOff >= src.length ) { + if (srcOff >= src.length) { throw new ArrayIndexOutOfBoundsException(); } boolean found = false; @@ -522,8 +524,8 @@ public static int firstIndexOf(byte[] src, int srcOff, byte[] find){ byte first = find[0]; int pos = srcOff; while (!found) { - //find the first byte - while (pos < srclen){ + // find the first byte + while (pos < srclen) { if (first == src[pos]) { break; } @@ -533,20 +535,20 @@ public static int firstIndexOf(byte[] src, int srcOff, byte[] find){ return -1; } - //we found the first character - //match the rest of the bytes - they have to match - if ( (srclen - pos) < findlen) { + // we found the first character + // match the rest of the bytes - they have to match + if ((srclen - pos) < findlen) { return -1; } - //assume it does exist + // assume it does exist found = true; - for (int i = 1; ( (i < findlen) && found); i++) { + for (int i = 1; ((i < findlen) && found); i++) { found = (find[i] == src[pos + i]); } if (found) { result = pos; - } else if ( (srclen - pos) < findlen) { - return -1; //no more matches possible + } else if ((srclen - pos) < findlen) { + return -1; // no more matches possible } else { pos++; } @@ -555,34 +557,33 @@ public static int firstIndexOf(byte[] src, int srcOff, byte[] find){ } - public static Serializable deserialize(byte[] data) - throws IOException, ClassNotFoundException, ClassCastException { - return deserialize(data,0,data.length); + public static Serializable deserialize(byte[] data) throws IOException, ClassNotFoundException, ClassCastException { + return deserialize(data, 0, data.length); } public static Serializable deserialize(byte[] data, int offset, int length) - throws IOException, ClassNotFoundException, ClassCastException { - return deserialize(data,offset,length,null); + throws IOException, ClassNotFoundException, ClassCastException { + return deserialize(data, offset, length, null); } private static final AtomicInteger invokecount = new AtomicInteger(0); public static Serializable deserialize(byte[] data, int offset, int length, ClassLoader[] cls) - throws IOException, ClassNotFoundException, ClassCastException { + throws IOException, ClassNotFoundException, ClassCastException { invokecount.addAndGet(1); Object message = null; - if ( cls == null ) { + if (cls == null) { cls = new ClassLoader[0]; } if (data != null && length > 0) { - InputStream instream = new ByteArrayInputStream(data,offset,length); + InputStream instream = new ByteArrayInputStream(data, offset, length); ObjectInputStream stream = null; - stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream); + stream = (cls.length > 0) ? new ReplicationStream(instream, cls) : new ObjectInputStream(instream); message = stream.readObject(); instream.close(); stream.close(); } - if ( message == null ) { + if (message == null) { return null; } else if (message instanceof Serializable) { return (Serializable) message; @@ -593,8 +594,11 @@ public static Serializable deserialize(byte[] data, int offset, int length, Clas /** * Serializes a message into cluster data + * * @param msg ClusterMessage + * * @return serialized content as byte[] array + * * @throws IOException Serialization error */ public static byte[] serialize(Serializable msg) throws IOException {