Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply project settings to o.e.equinox.coordinator #362

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bundles/org.eclipse.equinox.coordinator/about.html
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ <h4>OSGi Materials</h4>
<small>Java and all Java-based trademarks and logos are trademarks or registered trademarks of Sun Microsystems, Inc. in the United States and other countries.</small>

</body>
</html>
</html>
2 changes: 1 addition & 1 deletion bundles/org.eclipse.equinox.coordinator/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ src.includes = about.html,\
about_files/

## Configuration for TCK tests to execute for this project
pom.model.property.tck.artifact = org.osgi.test.cases.coordinator
pom.model.property.tck.artifact = org.osgi.test.cases.coordinator
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public class Activator implements BundleActivator {
public void start(BundleContext bundleContext) throws Exception {
factory = new CoordinatorServiceFactory(bundleContext);
Dictionary<String, Object> properties = new Hashtable<>();
@SuppressWarnings({"unchecked"})
@SuppressWarnings({ "unchecked" })
// Use local variable to avoid suppressing unchecked warnings at method level.
ServiceRegistration<Coordinator> reg = (ServiceRegistration<Coordinator>) bundleContext.registerService(Coordinator.class.getName(), factory, properties);
ServiceRegistration<Coordinator> reg = (ServiceRegistration<Coordinator>) bundleContext
.registerService(Coordinator.class.getName(), factory, properties);
this.registration = reg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CoordinationImpl {
private volatile Throwable failure;
private volatile boolean terminated;
private volatile boolean ending = false;

private Date deadline;
private CoordinationImpl enclosingCoordination;
private Thread thread;
Expand All @@ -62,7 +62,8 @@ public CoordinationImpl(long id, String name, long timeout, CoordinatorImpl coor
this.coordinator = coordinator;
participants = Collections.synchronizedList(new ArrayList<Participant>());
variables = new HashMap<>();
// Not an escaping 'this' reference. It will not escape the thread calling the constructor.
// Not an escaping 'this' reference. It will not escape the thread calling the
// constructor.
referent = new CoordinationReferent(this);
}

Expand All @@ -71,16 +72,16 @@ public void addParticipant(Participant participant) throws CoordinationException
coordinator.checkPermission(CoordinationPermission.PARTICIPATE, name);
if (participant == null)
throw new NullPointerException(NLS.bind(Messages.NullParameter, "participant")); //$NON-NLS-1$
/* The caller has permission. Check to see if the participant is already
/*
* The caller has permission. Check to see if the participant is already
* participating in another coordination. Do this in a loop in case the
* participant must wait for the other coordination to finish. The loop
* will exit under the following circumstances.
* participant must wait for the other coordination to finish. The loop will
* exit under the following circumstances.
*
* (1) This coordination is terminated.
* (2) The participant is already participating in another coordination
* using the same thread as this one.
* (3) This thread is interrupted.
* (4) The participant is not participating in another coordination.
* (1) This coordination is terminated. (2) The participant is already
* participating in another coordination using the same thread as this one. (3)
* This thread is interrupted. (4) The participant is not participating in
* another coordination.
*/
while (true) {
CoordinationImpl coordination;
Expand All @@ -107,7 +108,9 @@ public void addParticipant(Participant participant) throws CoordinationException
// any thread, and there's nothing to compare. If the coordination
// is using this thread, then we can't block due to risk of deadlock.
if (t == Thread.currentThread()) {
throw new CoordinationException(NLS.bind(Messages.Deadlock, new Object[]{participant, getName(), getId()}), referent, CoordinationException.DEADLOCK_DETECTED);
throw new CoordinationException(
NLS.bind(Messages.Deadlock, new Object[] { participant, getName(), getId() }), referent,
CoordinationException.DEADLOCK_DETECTED);
}
}
}
Expand All @@ -119,7 +122,8 @@ public void addParticipant(Participant participant) throws CoordinationException
try {
coordination.join(1000);
} catch (InterruptedException e) {
String message = NLS.bind(Messages.LockInterrupted, new Object[]{participant, name, id, coordination.getName(), coordination.getId()});
String message = NLS.bind(Messages.LockInterrupted,
new Object[] { participant, name, id, coordination.getName(), coordination.getId() });
coordinator.getLogService().log(LogService.LOG_DEBUG, message, e);
// This thread was interrupted while waiting for the coordination
// to terminate.
Expand All @@ -133,20 +137,14 @@ public void end() throws CoordinationException {
// Terminating the coordination must be atomic.
synchronized (this) {
/*
* Set the ending flag to avoid spurious failures for orphans
* It appears the VM can aggressively puts objects on the queue if the last call is done in a finally
* Coordination c = coordinator.begin("name", 0);
* try {
* ...
* } finally {
* c.end()
* }
* In some cases it appears that while in the finally call to c.end()
* that c can become put on the queue for GC.
* This makes it eligible for orphan processing which will cause
* issues below when calling methods that invoke
* CoordinationWeakReference.processOrphanedCoordinations()
* We set an ending flag so that we can detect this
* Set the ending flag to avoid spurious failures for orphans It appears the VM
* can aggressively puts objects on the queue if the last call is done in a
* finally Coordination c = coordinator.begin("name", 0); try { ... } finally {
* c.end() } In some cases it appears that while in the finally call to c.end()
* that c can become put on the queue for GC. This makes it eligible for orphan
* processing which will cause issues below when calling methods that invoke
* CoordinationWeakReference.processOrphanedCoordinations() We set an ending
* flag so that we can detect this
*/
ending = true;
// If this coordination is associated with a thread, an additional
Expand All @@ -155,15 +153,18 @@ public void end() throws CoordinationException {
// Coordinations may only be ended by the same thread that
// pushed them onto the stack, if any.
if (thread != Thread.currentThread()) {
throw new CoordinationException(NLS.bind(Messages.EndingThreadNotSame, new Object[]{name, id, thread, Thread.currentThread()}), referent, CoordinationException.WRONG_THREAD);
throw new CoordinationException(
NLS.bind(Messages.EndingThreadNotSame,
new Object[] { name, id, thread, Thread.currentThread() }),
referent, CoordinationException.WRONG_THREAD);
}
// Unwind the stack in case there are other coordinations higher
// up than this one. See bug 421487 for why peek() may be null.
for (Coordination peeked = coordinator.peek(); !(peeked == null || referent.equals(peeked)); peeked = coordinator.peek()) {
for (Coordination peeked = coordinator.peek(); !(peeked == null
|| referent.equals(peeked)); peeked = coordinator.peek()) {
try {
peeked.end();
}
catch (CoordinationException e) {
} catch (CoordinationException e) {
peeked = coordinator.peek();
if (peeked != null)
peeked.fail(e);
Expand All @@ -188,7 +189,8 @@ public void end() throws CoordinationException {
try {
participant.ended(referent);
} catch (Exception e) {
coordinator.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.ParticipantEndedError, new Object[]{participant, name, id}), e);
coordinator.getLogService().log(LogService.LOG_WARNING,
NLS.bind(Messages.ParticipantEndedError, new Object[] { participant, name, id }), e);
// Only the first exception will be propagated.
if (exception == null) {
exception = e;
Expand All @@ -202,7 +204,9 @@ public void end() throws CoordinationException {
}
// If a partial ending has occurred, throw the required exception.
if (exception != null) {
throw new CoordinationException(NLS.bind(Messages.CoordinationPartiallyEnded, new Object[]{name, id, exceptionParticipant}), referent, CoordinationException.PARTIALLY_ENDED, exception);
throw new CoordinationException(
NLS.bind(Messages.CoordinationPartiallyEnded, new Object[] { name, id, exceptionParticipant }),
referent, CoordinationException.PARTIALLY_ENDED, exception);
}
}

Expand Down Expand Up @@ -256,9 +260,11 @@ else if (newTotalTimeout > maxTimeout) {
join(0);
// Now determine how it terminated and throw the appropriate exception.
checkTerminated();
}
catch (InterruptedException e) {
throw new CoordinationException(NLS.bind(Messages.InterruptedTimeoutExtension, new Object[]{totalTimeout, getName(), getId(), timeInMillis}), referent, CoordinationException.UNKNOWN, e);
} catch (InterruptedException e) {
throw new CoordinationException(
NLS.bind(Messages.InterruptedTimeoutExtension,
new Object[] { totalTimeout, getName(), getId(), timeInMillis }),
referent, CoordinationException.UNKNOWN, e);
}
}
// Create the new timeout.
Expand Down Expand Up @@ -297,7 +303,8 @@ public boolean fail(Throwable reason) {
try {
participant.failed(referent);
} catch (Exception e) {
coordinator.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.ParticipantFailedError, new Object[]{participant, name, id}), e);
coordinator.getLogService().log(LogService.LOG_WARNING,
NLS.bind(Messages.ParticipantFailedError, new Object[] { participant, name, id }), e);
}
}
synchronized (this) {
Expand Down Expand Up @@ -422,11 +429,13 @@ private void checkTerminated() throws CoordinationException {
// must be thrown.
if (failure != null) {
// The fail() method was called indicating the coordination failed.
throw new CoordinationException(NLS.bind(Messages.CoordinationFailed, name, id), referent, CoordinationException.FAILED, failure);
throw new CoordinationException(NLS.bind(Messages.CoordinationFailed, name, id), referent,
CoordinationException.FAILED, failure);
}
// The coordination did not fail, so it either partially ended or
// ended successfully.
throw new CoordinationException(NLS.bind(Messages.CoordinationEnded, name, id), referent, CoordinationException.ALREADY_ENDED);
throw new CoordinationException(NLS.bind(Messages.CoordinationEnded, name, id), referent,
CoordinationException.ALREADY_ENDED);
}

private void terminate() throws CoordinationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean equals(Object object) {
return true;
if (!(object instanceof CoordinationReferent))
return false;
return coordination.equals(((CoordinationReferent)object).coordination);
return coordination.equals(((CoordinationReferent) object).coordination);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ public CoordinationTimerTask(CoordinationImpl coordination) {

@Override
public void run() {
// Catch all exceptions and errors in order to prevent the timer
// Catch all exceptions and errors in order to prevent the timer
// thread from stopping.
try {
coordination.fail(Coordination.TIMEOUT);
} catch (Throwable t) {
coordination.getLogService().log(LogService.LOG_ERROR, NLS.bind(Messages.CoordinationTimedOutError, new Object[]{coordination.getName(), coordination.getId(), Thread.currentThread()}), t);
coordination.getLogService().log(LogService.LOG_ERROR, NLS.bind(Messages.CoordinationTimedOutError,
new Object[] { coordination.getName(), coordination.getId(), Thread.currentThread() }), t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,44 @@

public class CoordinationWeakReference extends WeakReference<CoordinationReferent> {
private static final ReferenceQueue<CoordinationReferent> referenceQueue = new ReferenceQueue<>();

public static void processOrphanedCoordinations() {
CoordinationWeakReference r;
while ((r = (CoordinationWeakReference)referenceQueue.poll()) != null) {
while ((r = (CoordinationWeakReference) referenceQueue.poll()) != null) {
CoordinationImpl c = r.getCoordination();
if (!c.isEnding()) {
try {
c.fail(Coordination.ORPHANED);
}
catch (Exception e) {
c.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.OrphanedCoordinationError, c.getName(), c.getId()), e);
}
finally {
} catch (Exception e) {
c.getLogService().log(LogService.LOG_WARNING,
NLS.bind(Messages.OrphanedCoordinationError, c.getName(), c.getId()), e);
} finally {
try {
c.end();
}
catch (CoordinationException e) {
} catch (CoordinationException e) {
// This is expected since we already failed the coordination...
if (!Coordination.ORPHANED.equals(e.getCause()))
// ...but only if the cause is ORPHANED.
c.getLogService().log(LogService.LOG_DEBUG, NLS.bind(Messages.OrphanedCoordinationError, c.getName(), c.getId()), e);
}
catch (Exception e) {
c.getLogService().log(LogService.LOG_WARNING, NLS.bind(Messages.OrphanedCoordinationError, c.getName(), c.getId()), e);
c.getLogService().log(LogService.LOG_DEBUG,
NLS.bind(Messages.OrphanedCoordinationError, c.getName(), c.getId()), e);
} catch (Exception e) {
c.getLogService().log(LogService.LOG_WARNING,
NLS.bind(Messages.OrphanedCoordinationError, c.getName(), c.getId()), e);
}
}
}
}
}

private final CoordinationImpl coordination;

public CoordinationWeakReference(CoordinationReferent referent, CoordinationImpl coordination) {
super(referent, referenceQueue);
if (coordination == null)
throw new NullPointerException();
this.coordination = coordination;
}

public CoordinationImpl getCoordination() {
return coordination;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ private synchronized static long getNextId() {
// Coordination IDs must be unique across all using bundles.
private static final Map<Long, CoordinationImpl> idToCoordination = new HashMap<>();
// Coordination participation must be tracked across all using bundles.
private static final Map<Participant, CoordinationImpl> participantToCoordination = Collections.synchronizedMap(new IdentityHashMap<Participant, CoordinationImpl>());
private static final Map<Participant, CoordinationImpl> participantToCoordination = Collections
.synchronizedMap(new IdentityHashMap<Participant, CoordinationImpl>());

private static ThreadLocal<WeakCoordinationStack> coordinationStack = new ThreadLocal<WeakCoordinationStack>() {
@Override
Expand Down Expand Up @@ -85,8 +86,12 @@ public CoordinationImpl pop() {

public void push(CoordinationImpl c) {
if (contains(c))
throw new CoordinationException(NLS.bind(Messages.CoordinationAlreadyExists, new Object[]{c.getName(), c.getId(), Thread.currentThread()}), c.getReferent(), CoordinationException.ALREADY_PUSHED);
c.setThreadAndEnclosingCoordination(Thread.currentThread(), coordinations.isEmpty() ? null : coordinations.getFirst());
throw new CoordinationException(
NLS.bind(Messages.CoordinationAlreadyExists,
new Object[] { c.getName(), c.getId(), Thread.currentThread() }),
c.getReferent(), CoordinationException.ALREADY_PUSHED);
c.setThreadAndEnclosingCoordination(Thread.currentThread(),
coordinations.isEmpty() ? null : coordinations.getFirst());
coordinations.addFirst(c);
}
}
Expand Down Expand Up @@ -131,7 +136,8 @@ public Coordination create(String name, long timeout) {
// Override the requested timeout with the max timeout, if necessary.
if (maxTimeout != 0) {
if (timeout == 0 || maxTimeout < timeout) {
logTracker.log(LogService.LOG_WARNING, NLS.bind(Messages.MaximumTimeout, new Object[]{timeout, maxTimeout, name}));
logTracker.log(LogService.LOG_WARNING,
NLS.bind(Messages.MaximumTimeout, new Object[] { timeout, maxTimeout, name }));
timeout = maxTimeout;
}
}
Expand All @@ -142,7 +148,8 @@ public Coordination create(String name, long timeout) {
CoordinationReferent referent = new CoordinationReferent(coordination);
// Create a weak reference to the referent returned to the initiator. No other
// references to the initiator's referent must be maintained outside of this
// method. A strong reference to the CoordinationWeakReference must be maintained
// method. A strong reference to the CoordinationWeakReference must be
// maintained
// by the coordination in order to avoid garbage collection. It serves no other
// purpose. Just "set it and forget it".
coordination.reference = new CoordinationWeakReference(referent, coordination);
Expand Down Expand Up @@ -182,7 +189,8 @@ public Coordination getCoordination(long id) {
try {
checkPermission(CoordinationPermission.ADMIN, result.getName());
} catch (SecurityException e) {
logTracker.log(LogService.LOG_DEBUG, NLS.bind(Messages.GetCoordinationNotPermitted, new Object[]{Thread.currentThread(), result.getName(), result.getId()}), e);
logTracker.log(LogService.LOG_DEBUG, NLS.bind(Messages.GetCoordinationNotPermitted,
new Object[] { Thread.currentThread(), result.getName(), result.getId() }), e);
result = null;
}
}
Expand All @@ -203,7 +211,8 @@ public Collection<Coordination> getCoordinations() {
checkPermission(CoordinationPermission.ADMIN, coordination.getName());
result.add(coordination.getReferent());
} catch (SecurityException e) {
logTracker.log(LogService.LOG_DEBUG, NLS.bind(Messages.GetCoordinationNotPermitted, new Object[]{Thread.currentThread(), coordination.getName(), coordination.getId()}), e);
logTracker.log(LogService.LOG_DEBUG, NLS.bind(Messages.GetCoordinationNotPermitted,
new Object[] { Thread.currentThread(), coordination.getName(), coordination.getId() }), e);
}
}
}
Expand All @@ -222,7 +231,8 @@ public Coordination peek() {
public Coordination pop() {
CoordinationWeakReference.processOrphanedCoordinations();
CoordinationImpl c = coordinationStack.get().peek();
if (c == null) return null;
if (c == null)
return null;
checkPermission(CoordinationPermission.INITIATE, c.getName());
return coordinationStack.get().pop().getReferent();
}
Expand Down Expand Up @@ -256,7 +266,7 @@ Bundle getBundle() {
LogTracker getLogService() {
return logTracker;
}

long getMaxTimeout() {
return maxTimeout;
}
Expand Down Expand Up @@ -296,7 +306,8 @@ void shutdown() {
* This procedure must occur when a coordination is being failed or ended.
*/
void terminate(CoordinationImpl coordination, List<Participant> participants) {
// A coordination has been terminated and needs to be removed from the thread local stack.
// A coordination has been terminated and needs to be removed from the thread
// local stack.
synchronized (this) {
synchronized (CoordinatorImpl.class) {
this.coordinations.remove(coordination);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void shutdown() {
timer.cancel();
logTracker.close();
}

private long getMaxTimeout() {
String prop = bundleContext.getProperty("org.eclipse.equinox.coordinator.timeout"); //$NON-NLS-1$
// Intentionally letting the possible NumberFormatException propagate.
Expand Down
Loading
Loading