Skip to content

Commit

Permalink
Code clean-up - formatting. No functional change
Browse files Browse the repository at this point in the history
  • Loading branch information
markt-asf committed May 10, 2024
1 parent 7668f14 commit 6a166ad
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 482 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import org.apache.catalina.tribes.io.ListenCallback;

public abstract class AbstractRxTask implements Runnable
{
public abstract class AbstractRxTask implements Runnable {

public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;

Expand Down
15 changes: 8 additions & 7 deletions java/org/apache/catalina/tribes/transport/AbstractSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AbstractSender implements DataSender {
private Member destination;
private InetAddress address;
private int port;
private int maxRetryAttempts = 1;//1 resends
private int maxRetryAttempts = 1;// 1 resends
private int attempt;
private boolean tcpNoDelay = true;
private boolean soKeepAlive = false;
Expand All @@ -52,8 +52,9 @@ public abstract class AbstractSender implements DataSender {

/**
* transfers sender properties from one sender to another
*
* @param from AbstractSender
* @param to AbstractSender
* @param to AbstractSender
*/
public static void transferProperties(AbstractSender from, AbstractSender to) {
to.rxBufSize = from.rxBufSize;
Expand Down Expand Up @@ -87,19 +88,19 @@ public AbstractSender() {
public boolean keepalive() {
boolean disconnect = false;
if (isUdpBased()) {
disconnect = true; //always disconnect UDP, TODO optimize the keepalive handling
} else if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) {
disconnect = true; // always disconnect UDP, TODO optimize the keepalive handling
} else if (keepAliveCount >= 0 && requestCount > keepAliveCount) {
disconnect = true;
} else if ( keepAliveTime >= 0 && (System.currentTimeMillis()-connectTime)>keepAliveTime ) {
} else if (keepAliveTime >= 0 && (System.currentTimeMillis() - connectTime) > keepAliveTime) {
disconnect = true;
}
if ( disconnect ) {
if (disconnect) {
disconnect();
}
return disconnect;
}

protected void setConnected(boolean connected){
protected void setConnected(boolean connected) {
this.connected = connected;
}

Expand Down
8 changes: 4 additions & 4 deletions java/org/apache/catalina/tribes/transport/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.catalina.tribes.io.XByteBuffer;

/**
* Manifest constants for the <code>org.apache.catalina.tribes.transport</code>
* package.
* Manifest constants for the <code>org.apache.catalina.tribes.transport</code> package.
*
* @author Peter Rossbach
*/
public class Constants {
Expand All @@ -33,8 +33,8 @@ public class Constants {
/*
* Do not change any of these values!
*/
public static final byte[] ACK_DATA = new byte[] {6, 2, 3};
public static final byte[] FAIL_ACK_DATA = new byte[] {11, 0, 5};
public static final byte[] ACK_DATA = new byte[] { 6, 2, 3 };
public static final byte[] FAIL_ACK_DATA = new byte[] { 11, 0, 5 };
public static final byte[] ACK_COMMAND = XByteBuffer.createDataPackage(ACK_DATA);
public static final byte[] FAIL_ACK_COMMAND = XByteBuffer.createDataPackage(FAIL_ACK_DATA);

Expand Down
7 changes: 7 additions & 0 deletions java/org/apache/catalina/tribes/transport/DataSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public interface DataSender {

/**
* Connect.
*
* @throws IOException when an error occurs
*/
void connect() throws IOException;
Expand All @@ -38,36 +39,42 @@ public interface DataSender {

/**
* Set the receive buffer size.
*
* @param size the new size
*/
void setRxBufSize(int size);

/**
* Set the transmit buffer size.
*
* @param size the new size
*/
void setTxBufSize(int size);

/**
* Keepalive.
*
* @return {@code true} if kept alive
*/
boolean keepalive();

/**
* Set the socket timeout.
*
* @param timeout in ms
*/
void setTimeout(long timeout);

/**
* Set the amount of requests during which to keepalive.
*
* @param maxRequests the amount of requests
*/
void setKeepAliveCount(int maxRequests);

/**
* Set the keepalive time.
*
* @param keepAliveTimeInMs the time in ms
*/
void setKeepAliveTime(long keepAliveTimeInMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.catalina.tribes.transport;

import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
Expand All @@ -23,32 +24,38 @@ public interface MultiPointSender extends DataSender {

/**
* Send the specified message.
*
* @param destination the message destinations
* @param data the data to send
* @param data the data to send
*
* @throws ChannelException if an error occurs
*/
void sendMessage(Member[] destination, ChannelMessage data) throws ChannelException;

/**
* Set the maximum retry attempts.
*
* @param attempts the retry count
*/
void setMaxRetryAttempts(int attempts);

/**
* Configure the use of a direct buffer.
*
* @param directBuf {@code true} to use a direct buffer
*/
void setDirectBuffer(boolean directBuf);

/**
* Send to the specified member.
*
* @param member the member
*/
void add(Member member);

/**
* Stop sending to the specified member.
*
* @param member the member
*/
void remove(Member member);
Expand Down
51 changes: 26 additions & 25 deletions java/org/apache/catalina/tribes/transport/PooledSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
public abstract class PooledSender extends AbstractSender implements MultiPointSender {

private static final Log log = LogFactory.getLog(PooledSender.class);
protected static final StringManager sm =
StringManager.getManager(Constants.Package);
protected static final StringManager sm = StringManager.getManager(Constants.Package);

private final SenderQueue queue;
private int poolSize = 25;
private long maxWait = 3000;

public PooledSender() {
queue = new SenderQueue(this,poolSize);
queue = new SenderQueue(this, poolSize);
}

public abstract DataSender getNewDataSender();
Expand All @@ -51,7 +51,7 @@ public void returnSender(DataSender sender) {

@Override
public synchronized void connect() throws IOException {
//do nothing, happens in the socket sender itself
// do nothing, happens in the socket sender itself
queue.open();
setConnected(true);
}
Expand Down Expand Up @@ -91,8 +91,8 @@ public void setMaxWait(long maxWait) {

@Override
public boolean keepalive() {
//do nothing, the pool checks on every return
return (queue==null)?false:queue.checkIdleKeepAlive();
// do nothing, the pool checks on every return
return (queue == null) ? false : queue.checkIdleKeepAlive();
}

@Override
Expand All @@ -102,12 +102,12 @@ public void add(Member member) {

@Override
public void remove(Member member) {
//no op for now, should not cancel out any keys
//can create serious sync issues
//all TCP connections are cleared out through keepalive
//and if remote node disappears
// no op for now, should not cancel out any keys
// can create serious sync issues
// all TCP connections are cleared out through keepalive
// and if remote node disappears
}
// ----------------------------------------------------- Inner Class
// ----------------------------------------------------- Inner Class

private static class SenderQueue {
private int limit = 25;
Expand All @@ -133,6 +133,7 @@ private static class SenderQueue {
public int getLimit() {
return limit;
}

/**
* @param limit The limit to set.
*/
Expand All @@ -159,7 +160,7 @@ public synchronized boolean checkIdleKeepAlive() {

public synchronized DataSender getSender(long timeout) {
long start = System.currentTimeMillis();
while ( true ) {
while (true) {
if (!isOpen) {
throw new IllegalStateException(sm.getString("pooledSender.closed.queue"));
}
Expand All @@ -172,35 +173,35 @@ public synchronized DataSender getSender(long timeout) {
if (sender != null) {
inuse.add(sender);
return sender;
}//end if
} // end if
long delta = System.currentTimeMillis() - start;
if ( delta > timeout && timeout>0) {
if (delta > timeout && timeout > 0) {
return null;
} else {
try {
wait(Math.max(timeout - delta,1));
}catch (InterruptedException x){}
}//end if
wait(Math.max(timeout - delta, 1));
} catch (InterruptedException x) {
}
} // end if
}
}

public synchronized void returnSender(DataSender sender) {
if ( !isOpen) {
if (!isOpen) {
sender.disconnect();
return;
}
//to do
// to do
inuse.remove(sender);
//just in case the limit has changed
if ( notinuse.size() < this.getLimit() ) {
// just in case the limit has changed
if (notinuse.size() < this.getLimit()) {
notinuse.add(sender);
} else {
try {
sender.disconnect();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"PooledSender.senderDisconnectFail"), e);
log.debug(sm.getString("PooledSender.senderDisconnectFail"), e);
}
}
}
Expand All @@ -214,11 +215,11 @@ public synchronized void close() {
for (Object value : unused) {
DataSender sender = (DataSender) value;
sender.disconnect();
}//for
} // for
for (Object o : used) {
DataSender sender = (DataSender) o;
sender.disconnect();
}//for
} // for
notinuse.clear();
inuse.clear();
notifyAll();
Expand Down
Loading

0 comments on commit 6a166ad

Please sign in to comment.