Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

javac.source=1.5

lib.version=0.1.4
lib.version=0.1.5-rc0
27 changes: 27 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,33 @@ limitations under the License.
<batchtest todir=".">
<fileset dir="${dir.build.java}">
<include name="**/*IntTests.class"/>
<exclude name="**/FactoryIntTests.class"/>
</fileset>
</batchtest>
</junit>
</target>

<!-- ******************************************************************************* -->
<!-- Run the integration tests. Servers on 27017 and 27018 must be running to work. -->
<!-- ******************************************************************************* -->

<target name="test-factory" depends="unit">

<junit fork="yes" haltonfailure="true">
<jvmarg value="-Duser.timezone=GMT"/>
<jvmarg value="-Dfile.encoding=UTF-8"/>
<classpath refid="classpath.all"/>

<classpath>
<pathelement path="${dir.build.java}"/>
<pathelement path="${dir.conf}"/>
</classpath>

<formatter type="brief" usefile="false"/>

<batchtest todir=".">
<fileset dir="${dir.build.java}">
<include name="**/FactoryIntTests.class"/>
</fileset>
</batchtest>
</junit>
Expand Down
7 changes: 6 additions & 1 deletion src/main/com/deftlabs/lock/mongo/DistributedLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@
public interface DistributedLock extends Lock {

/**
* Returns true if the lock is currently locked by the local process.
* Returns true if lock's held by this JVM / process.
*/
public boolean isLocked();

/**
* Returns true if the lock is currently locked by any process.
*/
public boolean isDistributedLocked();

/**
* Returns the lock name.
*/
Expand Down
4 changes: 4 additions & 0 deletions src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,9 @@ public interface DistributedLockSvc {
*/
public boolean isRunning();

/**
* Returns lock service options
*/
public DistributedLockSvcOptions getSvcOptions();
}

22 changes: 6 additions & 16 deletions src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.deftlabs.lock.mongo;

// Lib

import com.deftlabs.lock.mongo.impl.SvcImpl;

/**
Expand All @@ -25,29 +26,18 @@
public final class DistributedLockSvcFactory {

public DistributedLockSvcFactory(final DistributedLockSvcOptions pOptions) {
_options = pOptions;
_options = pOptions.copy();
}

/**
* Returns the global lock service. This method also calls the startup method on the
* lock service returned (when it is created).
* Returns the lock service for the specific lock options if one exists.
* Otherwise if this is the first time these options have been seen, creates
* a new lock service implementation.
*/
public DistributedLockSvc getLockSvc() {
if (_lockSvc != null && _lockSvc.isRunning()) return _lockSvc;

synchronized(_mutex) {
if (_lockSvc != null && _lockSvc.isRunning()) return _lockSvc;

final SvcImpl svc = new SvcImpl(_options);
svc.startup();
_lockSvc = svc;
return _lockSvc;
}
return SvcImpl.create(_options);
}

private static volatile DistributedLockSvc _lockSvc = null;

private final DistributedLockSvcOptions _options;
private final static Object _mutex = new Object();
}

48 changes: 48 additions & 0 deletions src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,54 @@ public DistributedLockSvcOptions( final String pMongoUri,
public void setHistoryCollectionName(final String pV) { _historyCollectionName = pV; }
public String getHistoryCollectionName() { return _historyCollectionName; }

public DistributedLockSvcOptions copy() {
DistributedLockSvcOptions copy = new DistributedLockSvcOptions( _mongoUri, _dbName, _collectionName, _appName );

copy._historyCollectionName = this._historyCollectionName;
copy._hostname = this._hostname;
copy._hostAddress = this._hostAddress;
copy._enableHistory = this._enableHistory;
copy._historyIsCapped = this._historyIsCapped;
copy._historySize = this._historySize;

return copy;
}

@Override
public boolean equals(Object o) {
if ( this == o ) return true;
if ( o == null || getClass() != o.getClass() ) return false;

DistributedLockSvcOptions that = (DistributedLockSvcOptions) o;

if ( _appName != null ? !_appName.equals( that._appName ) : that._appName != null ) return false;
if ( _collectionName != null ? !_collectionName.equals( that._collectionName ) : that._collectionName != null )
return false;
if ( _dbName != null ? !_dbName.equals( that._dbName ) : that._dbName != null ) return false;
if ( _mongoUri != null ? !_mongoUri.equals( that._mongoUri ) : that._mongoUri != null ) return false;

return true;
}

@Override
public int hashCode() {
int result = _mongoUri != null ? _mongoUri.hashCode() : 0;
result = 31 * result + ( _dbName != null ? _dbName.hashCode() : 0 );
result = 31 * result + ( _collectionName != null ? _collectionName.hashCode() : 0 );
result = 31 * result + ( _appName != null ? _appName.hashCode() : 0 );
return result;
}

@Override
public String toString() {
return "DistributedLockSvcOptions{" +
"_collectionName='" + _collectionName + '\'' +
", _appName='" + _appName + '\'' +
", _dbName='" + _dbName + '\'' +
", _mongoUri='" + _mongoUri + '\'' +
'}';
}

private final String _mongoUri;
private final String _dbName;
private final String _collectionName;
Expand Down
25 changes: 19 additions & 6 deletions src/main/com/deftlabs/lock/mongo/impl/LockImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
// Java
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -131,6 +130,16 @@ private boolean park(final long pNanos) {
return locked;
}

@Override
protected void finalize() throws Throwable {
super.finalize();

// only destroy if not already cleaned up by user
synchronized (this) {
if ( _running.get() ) { destroy(); }
}
}

/**
* Try and lock the distributed lock.
*/
Expand Down Expand Up @@ -189,25 +198,29 @@ private boolean tryDistributedLock() {
* Called to initialize the lock.
*/
synchronized void init() {
if (_running.get()) throw new IllegalStateException("init already called");
_running.set(true);
if (!_running.compareAndSet(false, true)) throw new IllegalStateException("init already called");
}

/**
* Called to destroy the lock.
*/
synchronized void destroy() {
if (!_running.get()) throw new IllegalStateException("destroy already called");
_running.set(false);
if (!_running.compareAndSet(true, false)) throw new IllegalStateException("destroy already called");
for (final Thread t : _waitingThreads) t.interrupt();
_waitingThreads.clear();
}

/**
* Returns true if the lock is currently locked.
* Returns true if lock's held by this JVM / process.
*/
@Override public boolean isLocked() { return _locked.get(); }

/**
* Returns true if the lock is currently locked by any process.
*/
@Override public boolean isDistributedLocked()
{ return isLocked() || LockDao.isLocked(_mongo, _name, _svcOptions); }

/**
* Returns the lock name.
*/
Expand Down
28 changes: 17 additions & 11 deletions src/main/com/deftlabs/lock/mongo/impl/Monitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.bson.types.ObjectId;

// Java
import java.util.Map;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -41,8 +41,9 @@ final class Monitor {
* closed properly (based on the lock/unlock) contract. This can happen when processes
* die unexpectedly (e.g., out of memory) or when they are not stopped properly (e.g., kill -9).
*/
static class LockHeartbeat implements Runnable {
@Override public void run() {
static class LockHeartbeat extends Thread {
@Override
public void run() {
while (_running) {
try {
for (final String lockName : _locks.keySet()) {
Expand All @@ -66,9 +67,11 @@ static class LockHeartbeat implements Runnable {
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks)
{
super("MongoDistributedLock-Heartbeat-" + pMongo.getConnectPoint());
_mongo = pMongo;
_svcOptions = pSvcOptions;
_locks = pLocks;
setDaemon(true);
}

private static final long HEARTBEAT_FREQUENCY = 5000;
Expand All @@ -85,8 +88,9 @@ static class LockHeartbeat implements Runnable {
* timeout thread runs in each process this lock lib is running. This thread is
* responsible for cleaning up expired locks (based on time since last heartbeat).
*/
static class LockTimeout implements Runnable {
@Override public void run() {
static class LockTimeout extends Thread {
@Override
public void run() {
while (_running) {
try {

Expand All @@ -101,8 +105,10 @@ static class LockTimeout implements Runnable {
LockTimeout(final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions)
{
super("MongoDistributedLock-LockTimeout-" + pMongo.getConnectPoint());
_mongo = pMongo;
_svcOptions = pSvcOptions;
setDaemon(true);
}

private static final long CHECK_FREQUENCY = 60000;
Expand All @@ -118,17 +124,15 @@ static class LockTimeout implements Runnable {
* The lock unlocked thread is responsible for waking up local
* threads when a lock state changes.
*/
static class LockUnlocked implements Runnable {
@Override public void run() {
static class LockUnlocked extends Thread {
@Override
public void run() {
while (_running) {
try {
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);

if (lock.isLocked()) continue;

// Check to see if this is locked.
if (LockDao.isLocked(_mongo, lockName, _svcOptions)) continue;
if (lock.isDistributedLocked()) continue;

// The lock is not locked, wakeup any blocking threads.
lock.wakeupBlocked();
Expand All @@ -144,9 +148,11 @@ static class LockUnlocked implements Runnable {
final DistributedLockSvcOptions pSvcOptions,
final Map<String, DistributedLock> pLocks)
{
super("MongoDistributedLock-LockUnlocked-" + pMongo.getConnectPoint());
_mongo = pMongo;
_svcOptions = pSvcOptions;
_locks = pLocks;
setDaemon(true);
}

private static final long FREQUENCY = 1000;
Expand Down
Loading