Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static ZookeeperProperties getZookeeperProperties(Configuration configura
if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
zkServers = configuration.getStringArray(HA_ZOOKEEPER_CONNECT);
} else {
zkServers = configuration.getStringArray("atlas.kafka." + ZOOKEEPER_PREFIX + "connect");
zkServers = new String[0];
}

String zkRoot = configuration.getString(ATLAS_SERVER_HA_ZK_ROOT_KEY, ATLAS_SERVER_ZK_ROOT_DEFAULT);
Expand Down
22 changes: 22 additions & 0 deletions common/src/test/java/org/apache/atlas/ha/HAConfigurationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.util.List;

import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -110,4 +112,24 @@ public void testShouldGetZookeeperAuth() {
HAConfiguration.ZookeeperProperties zookeeperProperties = HAConfiguration.getZookeeperProperties(configuration);
assertTrue(zookeeperProperties.hasAuth());
}

@Test
public void testShouldGetZookeeperConnectStringFromHAConfig() {
when(configuration.containsKey(HAConfiguration.HA_ZOOKEEPER_CONNECT)).thenReturn(true);
when(configuration.getStringArray(HAConfiguration.HA_ZOOKEEPER_CONNECT)).thenReturn(new String[] {"zk1:2181", "zk2:2181"});

HAConfiguration.ZookeeperProperties zookeeperProperties = HAConfiguration.getZookeeperProperties(configuration);

assertEquals(zookeeperProperties.getConnectString(), "zk1:2181,zk2:2181");
}

@Test
public void testShouldReturnEmptyZookeeperConnectStringWhenMissing() {
when(configuration.containsKey(HAConfiguration.HA_ZOOKEEPER_CONNECT)).thenReturn(false);

HAConfiguration.ZookeeperProperties zookeeperProperties = HAConfiguration.getZookeeperProperties(configuration);

assertEquals(zookeeperProperties.getConnectString(), "");
verify(configuration, never()).getStringArray("atlas.kafka." + HAConfiguration.ZOOKEEPER_PREFIX + "connect");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public CuratorFactory() throws AtlasException {
public CuratorFactory(Configuration configuration) {
this.configuration = configuration;

initializeCuratorFramework();
if (configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY) && configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)) {
initializeCuratorFramework();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ protected void initializeCuratorFramework() {

@Test
public void testClientInstance() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)).thenReturn(true);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
Expand All @@ -154,6 +157,9 @@ protected void initializeCuratorFramework() {

@Test
public void testLeaderLatchInstance() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)).thenReturn(true);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
Expand All @@ -176,6 +182,9 @@ protected void initializeCuratorFramework() {

@Test
public void testLockInstance() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)).thenReturn(true);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
Expand All @@ -197,6 +206,9 @@ protected void initializeCuratorFramework() {

@Test
public void testClose() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)).thenReturn(true);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
Expand All @@ -215,6 +227,56 @@ protected void initializeCuratorFramework() {
verify(curatorFramework).close();
}

@Test
public void testConstructorDoesNotInitializeWhenHAKeyIsMissing() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(false);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
throw new AssertionError("initializeCuratorFramework() should not be called when HA key is missing");
}
};

assertNull(curatorFactory.clientInstance());
}

@Test
public void testConstructorDoesNotInitializeWhenHAIsDisabled() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)).thenReturn(false);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
throw new AssertionError("initializeCuratorFramework() should not be called when HA is disabled");
}
};

assertNull(curatorFactory.clientInstance());
}

@Test
public void testConstructorInitializesWhenHAIsEnabled() {
when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, true)).thenReturn(true);

CuratorFactory curatorFactory = new CuratorFactory(configuration) {
@Override
protected void initializeCuratorFramework() {
try {
Field field = CuratorFactory.class.getDeclaredField("curatorFramework");
field.setAccessible(true);
field.set(this, curatorFramework);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};

assertEquals(curatorFactory.clientInstance(), curatorFramework);
}

@Test
public void testGetIdForLoggingSaslScheme() throws Exception {
CuratorFactory curatorFactory = new CuratorFactory(configuration) {
Expand Down
Loading