diff --git a/build.properties b/build.properties
index 35ae151..bd515ae 100644
--- a/build.properties
+++ b/build.properties
@@ -16,4 +16,4 @@
javac.source=1.5
-lib.version=0.1.4
+lib.version=0.1.5-rc0
diff --git a/build.xml b/build.xml
index 5461902..3473a58 100644
--- a/build.xml
+++ b/build.xml
@@ -183,6 +183,33 @@ limitations under the License.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLock.java b/src/main/com/deftlabs/lock/mongo/DistributedLock.java
index c5de067..85e60f4 100644
--- a/src/main/com/deftlabs/lock/mongo/DistributedLock.java
+++ b/src/main/com/deftlabs/lock/mongo/DistributedLock.java
@@ -28,10 +28,15 @@
public interface DistributedLock extends Lock {
/**
- * Returns true if the lock is currently locked by the local process.
+ * Returns true if lock's held by this JVM / process.
*/
public boolean isLocked();
+ /**
+ * Returns true if the lock is currently locked by any process.
+ */
+ public boolean isDistributedLocked();
+
/**
* Returns the lock name.
*/
diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java b/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java
index b2e25c9..e68bdcb 100644
--- a/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java
+++ b/src/main/com/deftlabs/lock/mongo/DistributedLockSvc.java
@@ -53,5 +53,9 @@ public interface DistributedLockSvc {
*/
public boolean isRunning();
+ /**
+ * Returns lock service options
+ */
+ public DistributedLockSvcOptions getSvcOptions();
}
diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java
index 4361a02..b5514c5 100644
--- a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java
+++ b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcFactory.java
@@ -17,6 +17,7 @@
package com.deftlabs.lock.mongo;
// Lib
+
import com.deftlabs.lock.mongo.impl.SvcImpl;
/**
@@ -25,29 +26,18 @@
public final class DistributedLockSvcFactory {
public DistributedLockSvcFactory(final DistributedLockSvcOptions pOptions) {
- _options = pOptions;
+ _options = pOptions.copy();
}
/**
- * Returns the global lock service. This method also calls the startup method on the
- * lock service returned (when it is created).
+ * Returns the lock service for the specific lock options if one exists.
+ * Otherwise if this is the first time these options have been seen, creates
+ * a new lock service implementation.
*/
public DistributedLockSvc getLockSvc() {
- if (_lockSvc != null && _lockSvc.isRunning()) return _lockSvc;
-
- synchronized(_mutex) {
- if (_lockSvc != null && _lockSvc.isRunning()) return _lockSvc;
-
- final SvcImpl svc = new SvcImpl(_options);
- svc.startup();
- _lockSvc = svc;
- return _lockSvc;
- }
+ return SvcImpl.create(_options);
}
- private static volatile DistributedLockSvc _lockSvc = null;
-
private final DistributedLockSvcOptions _options;
- private final static Object _mutex = new Object();
}
diff --git a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java
index 529008a..966f7b4 100644
--- a/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java
+++ b/src/main/com/deftlabs/lock/mongo/DistributedLockSvcOptions.java
@@ -107,6 +107,54 @@ public DistributedLockSvcOptions( final String pMongoUri,
public void setHistoryCollectionName(final String pV) { _historyCollectionName = pV; }
public String getHistoryCollectionName() { return _historyCollectionName; }
+ public DistributedLockSvcOptions copy() {
+ DistributedLockSvcOptions copy = new DistributedLockSvcOptions( _mongoUri, _dbName, _collectionName, _appName );
+
+ copy._historyCollectionName = this._historyCollectionName;
+ copy._hostname = this._hostname;
+ copy._hostAddress = this._hostAddress;
+ copy._enableHistory = this._enableHistory;
+ copy._historyIsCapped = this._historyIsCapped;
+ copy._historySize = this._historySize;
+
+ return copy;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if ( this == o ) return true;
+ if ( o == null || getClass() != o.getClass() ) return false;
+
+ DistributedLockSvcOptions that = (DistributedLockSvcOptions) o;
+
+ if ( _appName != null ? !_appName.equals( that._appName ) : that._appName != null ) return false;
+ if ( _collectionName != null ? !_collectionName.equals( that._collectionName ) : that._collectionName != null )
+ return false;
+ if ( _dbName != null ? !_dbName.equals( that._dbName ) : that._dbName != null ) return false;
+ if ( _mongoUri != null ? !_mongoUri.equals( that._mongoUri ) : that._mongoUri != null ) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = _mongoUri != null ? _mongoUri.hashCode() : 0;
+ result = 31 * result + ( _dbName != null ? _dbName.hashCode() : 0 );
+ result = 31 * result + ( _collectionName != null ? _collectionName.hashCode() : 0 );
+ result = 31 * result + ( _appName != null ? _appName.hashCode() : 0 );
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DistributedLockSvcOptions{" +
+ "_collectionName='" + _collectionName + '\'' +
+ ", _appName='" + _appName + '\'' +
+ ", _dbName='" + _dbName + '\'' +
+ ", _mongoUri='" + _mongoUri + '\'' +
+ '}';
+ }
+
private final String _mongoUri;
private final String _dbName;
private final String _collectionName;
diff --git a/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java b/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java
index eb63fcd..6b4c045 100644
--- a/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java
+++ b/src/main/com/deftlabs/lock/mongo/impl/LockImpl.java
@@ -28,7 +28,6 @@
// Java
import java.util.Queue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -131,6 +130,16 @@ private boolean park(final long pNanos) {
return locked;
}
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+
+ // only destroy if not already cleaned up by user
+ synchronized (this) {
+ if ( _running.get() ) { destroy(); }
+ }
+ }
+
/**
* Try and lock the distributed lock.
*/
@@ -189,25 +198,29 @@ private boolean tryDistributedLock() {
* Called to initialize the lock.
*/
synchronized void init() {
- if (_running.get()) throw new IllegalStateException("init already called");
- _running.set(true);
+ if (!_running.compareAndSet(false, true)) throw new IllegalStateException("init already called");
}
/**
* Called to destroy the lock.
*/
synchronized void destroy() {
- if (!_running.get()) throw new IllegalStateException("destroy already called");
- _running.set(false);
+ if (!_running.compareAndSet(true, false)) throw new IllegalStateException("destroy already called");
for (final Thread t : _waitingThreads) t.interrupt();
_waitingThreads.clear();
}
/**
- * Returns true if the lock is currently locked.
+ * Returns true if lock's held by this JVM / process.
*/
@Override public boolean isLocked() { return _locked.get(); }
+ /**
+ * Returns true if the lock is currently locked by any process.
+ */
+ @Override public boolean isDistributedLocked()
+ { return isLocked() || LockDao.isLocked(_mongo, _name, _svcOptions); }
+
/**
* Returns the lock name.
*/
diff --git a/src/main/com/deftlabs/lock/mongo/impl/Monitor.java b/src/main/com/deftlabs/lock/mongo/impl/Monitor.java
index 8f76601..8307945 100644
--- a/src/main/com/deftlabs/lock/mongo/impl/Monitor.java
+++ b/src/main/com/deftlabs/lock/mongo/impl/Monitor.java
@@ -25,7 +25,7 @@
import org.bson.types.ObjectId;
// Java
-import java.util.Map;
+import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -41,8 +41,9 @@ final class Monitor {
* closed properly (based on the lock/unlock) contract. This can happen when processes
* die unexpectedly (e.g., out of memory) or when they are not stopped properly (e.g., kill -9).
*/
- static class LockHeartbeat implements Runnable {
- @Override public void run() {
+ static class LockHeartbeat extends Thread {
+ @Override
+ public void run() {
while (_running) {
try {
for (final String lockName : _locks.keySet()) {
@@ -66,9 +67,11 @@ static class LockHeartbeat implements Runnable {
final DistributedLockSvcOptions pSvcOptions,
final Map pLocks)
{
+ super("MongoDistributedLock-Heartbeat-" + pMongo.getConnectPoint());
_mongo = pMongo;
_svcOptions = pSvcOptions;
_locks = pLocks;
+ setDaemon(true);
}
private static final long HEARTBEAT_FREQUENCY = 5000;
@@ -85,8 +88,9 @@ static class LockHeartbeat implements Runnable {
* timeout thread runs in each process this lock lib is running. This thread is
* responsible for cleaning up expired locks (based on time since last heartbeat).
*/
- static class LockTimeout implements Runnable {
- @Override public void run() {
+ static class LockTimeout extends Thread {
+ @Override
+ public void run() {
while (_running) {
try {
@@ -101,8 +105,10 @@ static class LockTimeout implements Runnable {
LockTimeout(final Mongo pMongo,
final DistributedLockSvcOptions pSvcOptions)
{
+ super("MongoDistributedLock-LockTimeout-" + pMongo.getConnectPoint());
_mongo = pMongo;
_svcOptions = pSvcOptions;
+ setDaemon(true);
}
private static final long CHECK_FREQUENCY = 60000;
@@ -118,17 +124,15 @@ static class LockTimeout implements Runnable {
* The lock unlocked thread is responsible for waking up local
* threads when a lock state changes.
*/
- static class LockUnlocked implements Runnable {
- @Override public void run() {
+ static class LockUnlocked extends Thread {
+ @Override
+ public void run() {
while (_running) {
try {
for (final String lockName : _locks.keySet()) {
final DistributedLock lock = _locks.get(lockName);
- if (lock.isLocked()) continue;
-
- // Check to see if this is locked.
- if (LockDao.isLocked(_mongo, lockName, _svcOptions)) continue;
+ if (lock.isDistributedLocked()) continue;
// The lock is not locked, wakeup any blocking threads.
lock.wakeupBlocked();
@@ -144,9 +148,11 @@ static class LockUnlocked implements Runnable {
final DistributedLockSvcOptions pSvcOptions,
final Map pLocks)
{
+ super("MongoDistributedLock-LockUnlocked-" + pMongo.getConnectPoint());
_mongo = pMongo;
_svcOptions = pSvcOptions;
_locks = pLocks;
+ setDaemon(true);
}
private static final long FREQUENCY = 1000;
diff --git a/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java b/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java
index 66baba7..01fffd4 100644
--- a/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java
+++ b/src/main/com/deftlabs/lock/mongo/impl/SvcImpl.java
@@ -28,6 +28,7 @@
import com.mongodb.MongoURI;
// Java
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -38,10 +39,15 @@
*/
public final class SvcImpl implements DistributedLockSvc {
- public SvcImpl(final DistributedLockSvcOptions pOptions) {
+ private SvcImpl(final DistributedLockSvcOptions pOptions) {
_options = pOptions;
}
+ @Override
+ public DistributedLockSvcOptions getSvcOptions() {
+ return _options.copy();
+ }
+
/**
* Returns a new lock. If the name is already in use and the lock has been
* created, it returns the existing lock.
@@ -77,9 +83,8 @@ public DistributedLock create(final String pLockName) {
@Override
public void destroy(final DistributedLock pLock) {
+ _lock.lock();
try {
- _lock.lock();
-
if (!_locks.containsKey(pLock.getName()))
{ throw new DistributedLockException("Lock has already been destroyed: " + pLock.getName()); }
@@ -97,9 +102,8 @@ public void destroy(final DistributedLock pLock) {
*/
@Override
public void startup() {
- _running.set(true);
+ _lock.lock();
try {
- _lock.lock();
_mongo = new Mongo(new MongoURI(_options.getMongoUri()));
@@ -109,15 +113,15 @@ public void startup() {
// Init the monitor threads.
_lockHeartbeat = new Monitor.LockHeartbeat(_mongo, _options, _locks);
- (new Thread(_lockHeartbeat)).start();
+ _lockHeartbeat.start();
_lockTimeout = new Monitor.LockTimeout(_mongo, _options);
- (new Thread(_lockTimeout)).start();
+ _lockTimeout.start();
_lockUnlocked = new Monitor.LockUnlocked(_mongo, _options, _locks);
- (new Thread(_lockUnlocked)).start();
-
+ _lockUnlocked.start();
+ _running.set( true );
} catch (final Throwable t) { throw new DistributedLockException(t);
} finally { _lock.unlock(); }
}
@@ -127,12 +131,11 @@ public void startup() {
*/
@Override
public void shutdown() {
-
- if (!_running.get()) throw new IllegalStateException("shutdown called but not running");
- _running.set(false);
-
+ _lock.lock();
try {
- _lock.lock();
+
+ // Performed while holding lock for finalize behavior
+ if (!_running.compareAndSet(true, false)) throw new IllegalStateException("shutdown called but not running");
// Interrupt the locks.
for (final String lockName : _locks.keySet()) {
@@ -146,10 +149,24 @@ public void shutdown() {
_lockHeartbeat.stopRunning();
_lockUnlocked.stopRunning();
+ destroy( _options );
} catch (final Throwable t) { throw new DistributedLockException(t);
} finally { _lock.unlock(); }
}
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+
+ _lock.lock();
+ try {
+ // only shutdown if not already cleaned up by user
+ if ( _running.get() ) { shutdown(); }
+ } finally {
+ _lock.unlock();
+ }
+ }
+
@Override
public boolean isRunning() { return _running.get(); }
@@ -166,5 +183,32 @@ public void shutdown() {
private final Map _locks = new ConcurrentHashMap();
+ private static final Map _lockSvcs
+ = new HashMap();
+
+ private static void destroy(final DistributedLockSvcOptions pOptions) {
+ if ( _lockSvcs.remove(pOptions) == null ) {
+ throw new IllegalStateException( "lock service already unregistered for: " + pOptions );
+ }
+ }
+
+ public static DistributedLockSvc create(final DistributedLockSvcOptions pOptions) {
+ DistributedLockSvc svc = _lockSvcs.get(pOptions);
+ if ( svc != null && svc.isRunning() ) { return svc; }
+
+ synchronized(SvcImpl.class) {
+ svc = _lockSvcs.get(pOptions);
+ if ( svc != null && svc.isRunning() ) { return svc; }
+
+ svc = new SvcImpl(pOptions);
+ svc.startup();
+ if ( _lockSvcs.put( pOptions, svc ) != null ) {
+ throw new IllegalStateException( "lock service already registered for: " + pOptions );
+ }
+
+ return svc;
+ }
+ }
+
}
diff --git a/src/test/com/deftlabs/lock/mongo/FactoryIntTests.java b/src/test/com/deftlabs/lock/mongo/FactoryIntTests.java
new file mode 100644
index 0000000..8311c07
--- /dev/null
+++ b/src/test/com/deftlabs/lock/mongo/FactoryIntTests.java
@@ -0,0 +1,46 @@
+package com.deftlabs.lock.mongo;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests service creation semantics. You must be running an instance of mongo on localhost:27017 and localhost:27018
+ */
+public final class FactoryIntTests {
+
+ private static final String MONGO_URI1 = "mongodb://127.0.0.1:27017/?maxpoolsize=10&waitqueuemultiple=5&connecttimeoutms=20000&sockettimeoutms=20000&autoconnectretry=true";
+ private static final String MONGO_URI2 = "mongodb://127.0.0.1:27018/?maxpoolsize=10&waitqueuemultiple=5&connecttimeoutms=20000&sockettimeoutms=20000&autoconnectretry=true";
+
+ @Test
+ public void factoryMaintainsSingleInstance() throws Exception {
+ final DistributedLockSvcOptions options1 = new DistributedLockSvcOptions(MONGO_URI1);
+ final DistributedLockSvc svc1 = new DistributedLockSvcFactory(options1).getLockSvc();
+
+ final DistributedLockSvcOptions options2 = new DistributedLockSvcOptions(MONGO_URI1);
+ final DistributedLockSvc svc2 = new DistributedLockSvcFactory(options2).getLockSvc();
+
+ assertSame(svc1, svc2);
+ assertEquals(svc1, svc2);
+ }
+
+ @Test public void canCreateSeparatelyConfiguredServices() throws Exception {
+ final DistributedLockSvcOptions options1 = new DistributedLockSvcOptions(MONGO_URI1);
+ final DistributedLockSvc svc1 = new DistributedLockSvcFactory(options1).getLockSvc();
+
+ final DistributedLockSvcOptions options2 = new DistributedLockSvcOptions(MONGO_URI2);
+ final DistributedLockSvc svc2 = new DistributedLockSvcFactory(options2).getLockSvc();
+
+ assertEquals(MONGO_URI1, svc1.getSvcOptions().getMongoUri());
+ assertEquals(MONGO_URI2, svc2.getSvcOptions().getMongoUri());
+ }
+
+ @Ignore
+ @Test(expected = IllegalArgumentException.class)
+ public void cannotRedefineMongoOptionsForSameHosts() throws Exception {
+ // TODO
+ }
+
+}