diff --git a/build.properties b/build.properties index 35ae151..bd515ae 100644 --- a/build.properties +++ b/build.properties @@ -16,4 +16,4 @@ javac.source=1.5 -lib.version=0.1.4 +lib.version=0.1.5-rc0 diff --git a/build.xml b/build.xml index 5461902..3473a58 100644 --- a/build.xml +++ b/build.xml @@ -183,6 +183,33 @@ limitations under the License. + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLock.java b/src/main/com/deftlabs/lock/mongo/DistributedLock.java index c5de067..85e60f4 100644 --- a/src/main/com/deftlabs/lock/mongo/DistributedLock.java +++ b/src/main/com/deftlabs/lock/mongo/DistributedLock.java @@ -28,10 +28,15 @@ public interface DistributedLock extends Lock { /** - * Returns true if the lock is currently locked by the local process. + * Returns true if lock's held by this JVM / process. */ public boolean isLocked(); + /** + * Returns true if the lock is currently locked by any process. + */ + public boolean isDistributedLocked(); + /** * Returns the lock name. */ diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java b/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java index b2e25c9..e68bdcb 100644 --- a/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java +++ b/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java @@ -53,5 +53,9 @@ public interface DistributedLockSvc { */ public boolean isRunning(); + /** + * Returns lock service options + */ + public DistributedLockSvcOptions getSvcOptions(); } diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java index 4361a02..b5514c5 100644 --- a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java +++ b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java @@ -17,6 +17,7 @@ package com.deftlabs.lock.mongo; // Lib + import com.deftlabs.lock.mongo.impl.SvcImpl; /** @@ -25,29 +26,18 @@ public final class DistributedLockSvcFactory { public DistributedLockSvcFactory(final DistributedLockSvcOptions pOptions) { - _options = pOptions; + _options = pOptions.copy(); } /** - * Returns the global lock service. This method also calls the startup method on the - * lock service returned (when it is created). + * Returns the lock service for the specific lock options if one exists. + * Otherwise if this is the first time these options have been seen, creates + * a new lock service implementation. */ public DistributedLockSvc getLockSvc() { - if (_lockSvc != null && _lockSvc.isRunning()) return _lockSvc; - - synchronized(_mutex) { - if (_lockSvc != null && _lockSvc.isRunning()) return _lockSvc; - - final SvcImpl svc = new SvcImpl(_options); - svc.startup(); - _lockSvc = svc; - return _lockSvc; - } + return SvcImpl.create(_options); } - private static volatile DistributedLockSvc _lockSvc = null; - private final DistributedLockSvcOptions _options; - private final static Object _mutex = new Object(); } diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java index 529008a..966f7b4 100644 --- a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java +++ b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java @@ -107,6 +107,54 @@ public DistributedLockSvcOptions( final String pMongoUri, public void setHistoryCollectionName(final String pV) { _historyCollectionName = pV; } public String getHistoryCollectionName() { return _historyCollectionName; } + public DistributedLockSvcOptions copy() { + DistributedLockSvcOptions copy = new DistributedLockSvcOptions( _mongoUri, _dbName, _collectionName, _appName ); + + copy._historyCollectionName = this._historyCollectionName; + copy._hostname = this._hostname; + copy._hostAddress = this._hostAddress; + copy._enableHistory = this._enableHistory; + copy._historyIsCapped = this._historyIsCapped; + copy._historySize = this._historySize; + + return copy; + } + + @Override + public boolean equals(Object o) { + if ( this == o ) return true; + if ( o == null || getClass() != o.getClass() ) return false; + + DistributedLockSvcOptions that = (DistributedLockSvcOptions) o; + + if ( _appName != null ? !_appName.equals( that._appName ) : that._appName != null ) return false; + if ( _collectionName != null ? !_collectionName.equals( that._collectionName ) : that._collectionName != null ) + return false; + if ( _dbName != null ? !_dbName.equals( that._dbName ) : that._dbName != null ) return false; + if ( _mongoUri != null ? !_mongoUri.equals( that._mongoUri ) : that._mongoUri != null ) return false; + + return true; + } + + @Override + public int hashCode() { + int result = _mongoUri != null ? _mongoUri.hashCode() : 0; + result = 31 * result + ( _dbName != null ? _dbName.hashCode() : 0 ); + result = 31 * result + ( _collectionName != null ? _collectionName.hashCode() : 0 ); + result = 31 * result + ( _appName != null ? _appName.hashCode() : 0 ); + return result; + } + + @Override + public String toString() { + return "DistributedLockSvcOptions{" + + "_collectionName='" + _collectionName + '\'' + + ", _appName='" + _appName + '\'' + + ", _dbName='" + _dbName + '\'' + + ", _mongoUri='" + _mongoUri + '\'' + + '}'; + } + private final String _mongoUri; private final String _dbName; private final String _collectionName; diff --git a/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java b/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java index eb63fcd..6b4c045 100644 --- a/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java +++ b/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java @@ -28,7 +28,6 @@ // Java import java.util.Queue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentLinkedQueue; @@ -131,6 +130,16 @@ private boolean park(final long pNanos) { return locked; } + @Override + protected void finalize() throws Throwable { + super.finalize(); + + // only destroy if not already cleaned up by user + synchronized (this) { + if ( _running.get() ) { destroy(); } + } + } + /** * Try and lock the distributed lock. */ @@ -189,25 +198,29 @@ private boolean tryDistributedLock() { * Called to initialize the lock. */ synchronized void init() { - if (_running.get()) throw new IllegalStateException("init already called"); - _running.set(true); + if (!_running.compareAndSet(false, true)) throw new IllegalStateException("init already called"); } /** * Called to destroy the lock. */ synchronized void destroy() { - if (!_running.get()) throw new IllegalStateException("destroy already called"); - _running.set(false); + if (!_running.compareAndSet(true, false)) throw new IllegalStateException("destroy already called"); for (final Thread t : _waitingThreads) t.interrupt(); _waitingThreads.clear(); } /** - * Returns true if the lock is currently locked. + * Returns true if lock's held by this JVM / process. */ @Override public boolean isLocked() { return _locked.get(); } + /** + * Returns true if the lock is currently locked by any process. + */ + @Override public boolean isDistributedLocked() + { return isLocked() || LockDao.isLocked(_mongo, _name, _svcOptions); } + /** * Returns the lock name. */ diff --git a/src/main/com/deftlabs/lock/mongo/impl/Monitor.java b/src/main/com/deftlabs/lock/mongo/impl/Monitor.java index 8f76601..8307945 100644 --- a/src/main/com/deftlabs/lock/mongo/impl/Monitor.java +++ b/src/main/com/deftlabs/lock/mongo/impl/Monitor.java @@ -25,7 +25,7 @@ import org.bson.types.ObjectId; // Java -import java.util.Map; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -41,8 +41,9 @@ final class Monitor { * closed properly (based on the lock/unlock) contract. This can happen when processes * die unexpectedly (e.g., out of memory) or when they are not stopped properly (e.g., kill -9). */ - static class LockHeartbeat implements Runnable { - @Override public void run() { + static class LockHeartbeat extends Thread { + @Override + public void run() { while (_running) { try { for (final String lockName : _locks.keySet()) { @@ -66,9 +67,11 @@ static class LockHeartbeat implements Runnable { final DistributedLockSvcOptions pSvcOptions, final Map pLocks) { + super("MongoDistributedLock-Heartbeat-" + pMongo.getConnectPoint()); _mongo = pMongo; _svcOptions = pSvcOptions; _locks = pLocks; + setDaemon(true); } private static final long HEARTBEAT_FREQUENCY = 5000; @@ -85,8 +88,9 @@ static class LockHeartbeat implements Runnable { * timeout thread runs in each process this lock lib is running. This thread is * responsible for cleaning up expired locks (based on time since last heartbeat). */ - static class LockTimeout implements Runnable { - @Override public void run() { + static class LockTimeout extends Thread { + @Override + public void run() { while (_running) { try { @@ -101,8 +105,10 @@ static class LockTimeout implements Runnable { LockTimeout(final Mongo pMongo, final DistributedLockSvcOptions pSvcOptions) { + super("MongoDistributedLock-LockTimeout-" + pMongo.getConnectPoint()); _mongo = pMongo; _svcOptions = pSvcOptions; + setDaemon(true); } private static final long CHECK_FREQUENCY = 60000; @@ -118,17 +124,15 @@ static class LockTimeout implements Runnable { * The lock unlocked thread is responsible for waking up local * threads when a lock state changes. */ - static class LockUnlocked implements Runnable { - @Override public void run() { + static class LockUnlocked extends Thread { + @Override + public void run() { while (_running) { try { for (final String lockName : _locks.keySet()) { final DistributedLock lock = _locks.get(lockName); - if (lock.isLocked()) continue; - - // Check to see if this is locked. - if (LockDao.isLocked(_mongo, lockName, _svcOptions)) continue; + if (lock.isDistributedLocked()) continue; // The lock is not locked, wakeup any blocking threads. lock.wakeupBlocked(); @@ -144,9 +148,11 @@ static class LockUnlocked implements Runnable { final DistributedLockSvcOptions pSvcOptions, final Map pLocks) { + super("MongoDistributedLock-LockUnlocked-" + pMongo.getConnectPoint()); _mongo = pMongo; _svcOptions = pSvcOptions; _locks = pLocks; + setDaemon(true); } private static final long FREQUENCY = 1000; diff --git a/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java b/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java index 66baba7..01fffd4 100644 --- a/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java +++ b/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java @@ -28,6 +28,7 @@ import com.mongodb.MongoURI; // Java +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -38,10 +39,15 @@ */ public final class SvcImpl implements DistributedLockSvc { - public SvcImpl(final DistributedLockSvcOptions pOptions) { + private SvcImpl(final DistributedLockSvcOptions pOptions) { _options = pOptions; } + @Override + public DistributedLockSvcOptions getSvcOptions() { + return _options.copy(); + } + /** * Returns a new lock. If the name is already in use and the lock has been * created, it returns the existing lock. @@ -77,9 +83,8 @@ public DistributedLock create(final String pLockName) { @Override public void destroy(final DistributedLock pLock) { + _lock.lock(); try { - _lock.lock(); - if (!_locks.containsKey(pLock.getName())) { throw new DistributedLockException("Lock has already been destroyed: " + pLock.getName()); } @@ -97,9 +102,8 @@ public void destroy(final DistributedLock pLock) { */ @Override public void startup() { - _running.set(true); + _lock.lock(); try { - _lock.lock(); _mongo = new Mongo(new MongoURI(_options.getMongoUri())); @@ -109,15 +113,15 @@ public void startup() { // Init the monitor threads. _lockHeartbeat = new Monitor.LockHeartbeat(_mongo, _options, _locks); - (new Thread(_lockHeartbeat)).start(); + _lockHeartbeat.start(); _lockTimeout = new Monitor.LockTimeout(_mongo, _options); - (new Thread(_lockTimeout)).start(); + _lockTimeout.start(); _lockUnlocked = new Monitor.LockUnlocked(_mongo, _options, _locks); - (new Thread(_lockUnlocked)).start(); - + _lockUnlocked.start(); + _running.set( true ); } catch (final Throwable t) { throw new DistributedLockException(t); } finally { _lock.unlock(); } } @@ -127,12 +131,11 @@ public void startup() { */ @Override public void shutdown() { - - if (!_running.get()) throw new IllegalStateException("shutdown called but not running"); - _running.set(false); - + _lock.lock(); try { - _lock.lock(); + + // Performed while holding lock for finalize behavior + if (!_running.compareAndSet(true, false)) throw new IllegalStateException("shutdown called but not running"); // Interrupt the locks. for (final String lockName : _locks.keySet()) { @@ -146,10 +149,24 @@ public void shutdown() { _lockHeartbeat.stopRunning(); _lockUnlocked.stopRunning(); + destroy( _options ); } catch (final Throwable t) { throw new DistributedLockException(t); } finally { _lock.unlock(); } } + @Override + protected void finalize() throws Throwable { + super.finalize(); + + _lock.lock(); + try { + // only shutdown if not already cleaned up by user + if ( _running.get() ) { shutdown(); } + } finally { + _lock.unlock(); + } + } + @Override public boolean isRunning() { return _running.get(); } @@ -166,5 +183,32 @@ public void shutdown() { private final Map _locks = new ConcurrentHashMap(); + private static final Map _lockSvcs + = new HashMap(); + + private static void destroy(final DistributedLockSvcOptions pOptions) { + if ( _lockSvcs.remove(pOptions) == null ) { + throw new IllegalStateException( "lock service already unregistered for: " + pOptions ); + } + } + + public static DistributedLockSvc create(final DistributedLockSvcOptions pOptions) { + DistributedLockSvc svc = _lockSvcs.get(pOptions); + if ( svc != null && svc.isRunning() ) { return svc; } + + synchronized(SvcImpl.class) { + svc = _lockSvcs.get(pOptions); + if ( svc != null && svc.isRunning() ) { return svc; } + + svc = new SvcImpl(pOptions); + svc.startup(); + if ( _lockSvcs.put( pOptions, svc ) != null ) { + throw new IllegalStateException( "lock service already registered for: " + pOptions ); + } + + return svc; + } + } + } diff --git a/src/test/com/deftlabs/lock/mongo/FactoryIntTests.java b/src/test/com/deftlabs/lock/mongo/FactoryIntTests.java new file mode 100644 index 0000000..8311c07 --- /dev/null +++ b/src/test/com/deftlabs/lock/mongo/FactoryIntTests.java @@ -0,0 +1,46 @@ +package com.deftlabs.lock.mongo; + +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Tests service creation semantics. You must be running an instance of mongo on localhost:27017 and localhost:27018 + */ +public final class FactoryIntTests { + + private static final String MONGO_URI1 = "mongodb://127.0.0.1:27017/?maxpoolsize=10&waitqueuemultiple=5&connecttimeoutms=20000&sockettimeoutms=20000&autoconnectretry=true"; + private static final String MONGO_URI2 = "mongodb://127.0.0.1:27018/?maxpoolsize=10&waitqueuemultiple=5&connecttimeoutms=20000&sockettimeoutms=20000&autoconnectretry=true"; + + @Test + public void factoryMaintainsSingleInstance() throws Exception { + final DistributedLockSvcOptions options1 = new DistributedLockSvcOptions(MONGO_URI1); + final DistributedLockSvc svc1 = new DistributedLockSvcFactory(options1).getLockSvc(); + + final DistributedLockSvcOptions options2 = new DistributedLockSvcOptions(MONGO_URI1); + final DistributedLockSvc svc2 = new DistributedLockSvcFactory(options2).getLockSvc(); + + assertSame(svc1, svc2); + assertEquals(svc1, svc2); + } + + @Test public void canCreateSeparatelyConfiguredServices() throws Exception { + final DistributedLockSvcOptions options1 = new DistributedLockSvcOptions(MONGO_URI1); + final DistributedLockSvc svc1 = new DistributedLockSvcFactory(options1).getLockSvc(); + + final DistributedLockSvcOptions options2 = new DistributedLockSvcOptions(MONGO_URI2); + final DistributedLockSvc svc2 = new DistributedLockSvcFactory(options2).getLockSvc(); + + assertEquals(MONGO_URI1, svc1.getSvcOptions().getMongoUri()); + assertEquals(MONGO_URI2, svc2.getSvcOptions().getMongoUri()); + } + + @Ignore + @Test(expected = IllegalArgumentException.class) + public void cannotRedefineMongoOptionsForSameHosts() throws Exception { + // TODO + } + +}