Skip to content

Commit

Permalink
ATLAS-4937: checkstyle compliance updates - couchbase-bridge module (#…
Browse files Browse the repository at this point in the history
…282)

(cherry picked from commit 1ec2dad)
  • Loading branch information
kumaab authored and mneethiraj committed Feb 10, 2025
1 parent d3a6c01 commit 79325d4
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 434 deletions.
4 changes: 4 additions & 0 deletions addons/couchbase-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

<name>Apache Atlas Couchbase Bridge</name>
<description>Apache Atlas Couchbase Bridge Module</description>
<properties>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
<checkstyle.skip>false</checkstyle.skip>
</properties>

<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

public class AtlasConfig {
private static final Map<String, String> ENV = System.getenv();
private static AtlasClientV2 client = null;
private static AtlasClientV2 client;

public static String[] urls() {
return new String[] { ENV.getOrDefault("ATLAS_URL", "http://localhost:21000") };
return new String[] {ENV.getOrDefault("ATLAS_URL", "http://localhost:21000")};
}

public static String username() {
Expand All @@ -40,6 +40,10 @@ public static String[] auth() {
return new String[] {username(), password()};
}

private AtlasConfig() {
// to block instantiation
}

public static AtlasClientV2 client() {
if (client == null) {
client = new AtlasClientV2(urls(), auth());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public class CBConfig {
private static Client mockDcpClient;
private static Cluster cluster;


private CBConfig() {
// to block instantiation
}

public static String address() {
return ENV.getOrDefault("CB_CLUSTER", "couchbase://localhost");
Expand Down Expand Up @@ -106,7 +108,7 @@ public static Client dcpClient() {

Client.Builder builder = Client.builder()
.collectionsAware(true)
.seedNodes(String.format("%s:%s",address(),dcpPort()))
.seedNodes(String.format("%s:%s", address(), dcpPort()))
.connectionString(address())
.credentials(username(), password());

Expand Down Expand Up @@ -142,4 +144,4 @@ protected static void dcpClient(Client mockDcpClient) {
private static boolean enableTLS() {
return Boolean.parseBoolean(ENV.getOrDefault("CB_ENABLE_TLS", "false"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -57,13 +58,12 @@
public class CouchbaseHook extends AtlasHook implements ControlEventHandler, DataEventHandler {
private static final Logger LOG = LoggerFactory.getLogger(CouchbaseHook.class);

protected static CouchbaseHook INSTANCE;
protected static Client DCP;
protected static AtlasClientV2 ATLAS;
private static Consumer<List<AtlasEntity>> createInterceptor;
private static Consumer<List<AtlasEntity>> updateInterceptor;
private static boolean loop = true;

protected static CouchbaseHook instance;
protected static Client dcpClient;
protected static AtlasClientV2 atlasClient;
private static Consumer<List<AtlasEntity>> createInterceptor;
private static Consumer<List<AtlasEntity>> updateInterceptor;
private static boolean loop = true;

private CouchbaseCluster clusterEntity;
private CouchbaseBucket bucketEntity;
Expand All @@ -75,103 +75,51 @@ public class CouchbaseHook extends AtlasHook implements ControlEventHandler, Dat
*/
public static void main(String[] args) {
// create instances of DCP client,
DCP = CBConfig.dcpClient();
dcpClient = CBConfig.dcpClient();

// Atlas client,
ATLAS = AtlasConfig.client();
atlasClient = AtlasConfig.client();

// and DCP handler
INSTANCE = new CouchbaseHook();
instance = new CouchbaseHook();

// register DCP handler with DCP client
DCP.controlEventHandler(INSTANCE);
DCP.dataEventHandler(INSTANCE);
dcpClient.controlEventHandler(instance);
dcpClient.dataEventHandler(instance);

// Connect to the cluster
DCP.connect().block();
dcpClient.connect().block();

LOG.info("DCP client connected.");

// Ensure the existence of corresponding
// CouchbaseCluster, CouchbaseBucket, CouchbaseScope
// entities and store them in local cache
INSTANCE.initializeAtlasContext();
// Ensure the existence of corresponding CouchbaseCluster, CouchbaseBucket, CouchbaseScope entities and store them in local cache
instance.initializeAtlasContext();

// Start listening to DCP
DCP.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();
dcpClient.initializeState(StreamFrom.NOW, StreamTo.INFINITY).block();

System.out.println("Starting the stream...");
DCP.startStreaming().block();
dcpClient.startStreaming().block();

System.out.println("Started the stream.");
// And then just loop the loop
try {
while (loop) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {

} catch (InterruptedException ignored) {
} finally {
DCP.disconnect().block();
dcpClient.disconnect().block();
}
}

/**
* Ensures the existence of CouchbaseCluster,
* CouchbaseBucket and Couchbase scope entities
* and stores them into local cache
*/
private void initializeAtlasContext() {
LOG.debug("Creating cluster/bucket/scope entities");

clusterEntity = new CouchbaseCluster()
.name(CBConfig.address())
.url(CBConfig.address())
.get();

bucketEntity = new CouchbaseBucket()
.name(CBConfig.bucket())
.cluster(clusterEntity)
.get();

List<AtlasEntity> entitiesToCreate = new ArrayList<>();

if (!clusterEntity.exists(ATLAS)) {
entitiesToCreate.add(clusterEntity.atlasEntity(ATLAS));
}

if (!bucketEntity.exists(ATLAS)) {
entitiesToCreate.add(bucketEntity.atlasEntity(ATLAS));
}

if (!entitiesToCreate.isEmpty()) {
createEntities(entitiesToCreate);
}
}

private void createEntities(List<AtlasEntity> entities) {
if (createInterceptor != null) {
createInterceptor.accept(entities);
return;
}

AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(entities);
EntityCreateRequestV2 request = new EntityCreateRequestV2("couchbase", entity);

notifyEntities(Arrays.asList(request), null);
protected static void setEntityInterceptors(Consumer<List<AtlasEntity>> createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
CouchbaseHook.createInterceptor = createInterceptor;
CouchbaseHook.updateInterceptor = updateInterceptor;
}

private void updateEntities(List<AtlasEntity> entities) {
if (updateInterceptor != null) {
updateInterceptor.accept(entities);

return;
}

AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(entities);
EntityUpdateRequestV2 request = new EntityUpdateRequestV2("couchbase", entity);

notifyEntities(Arrays.asList(request), null);
static void loop(boolean loop) {
CouchbaseHook.loop = loop;
}

@Override
Expand Down Expand Up @@ -201,28 +149,26 @@ public void onEvent(ChannelFlowController flowController, ByteBuf event) {
List<AtlasEntity> toCreate = new ArrayList<>();
List<AtlasEntity> toUpdate = new ArrayList<>();

if (!scopeEntity.exists(ATLAS)) {
toCreate.add(scopeEntity.atlasEntity(ATLAS));
if (!scopeEntity.exists(atlasClient)) {
toCreate.add(scopeEntity.atlasEntity(atlasClient));

LOG.debug("Creating scope: {}", scopeEntity.qualifiedName());
} else {
toUpdate.add(scopeEntity.atlasEntity(ATLAS));
toUpdate.add(scopeEntity.atlasEntity(atlasClient));

LOG.debug("Updating scope: {}", scopeEntity.qualifiedName());
}

CouchbaseCollection collectionEntity = scopeEntity.collection(collectionName);

// Let's record this attempt to analyze a collection document
// so that we can calculate field frequencies
// when filtering them via DCP_FIELD_THRESHOLD
// Let's record this attempt to analyze a collection document so that we can calculate field frequencies when filtering them via DCP_FIELD_THRESHOLD
collectionEntity.incrementAnalyzedDocuments();

// and then schedule it to be sent to Atlas
if (!collectionEntity.exists(ATLAS)) {
toCreate.add(collectionEntity.atlasEntity(ATLAS));
if (!collectionEntity.exists(atlasClient)) {
toCreate.add(collectionEntity.atlasEntity(atlasClient));
} else {
toUpdate.add(collectionEntity.atlasEntity(ATLAS));
toUpdate.add(collectionEntity.atlasEntity(atlasClient));
}

Map<String, Object> document = JsonObject.fromJson(DcpMutationMessage.contentBytes(event)).toMap();
Expand All @@ -237,13 +183,13 @@ public void onEvent(ChannelFlowController flowController, ByteBuf event) {
// update document counter on the field entity
.peek(CouchbaseField::incrementDocumentCount)
// Only passes fields that either already in Atlas or pass DCP_FIELD_THRESHOLD setting
.filter(field -> field.exists(ATLAS) || field.documentCount() / (float) collectionEntity.documentsAnalyzed() > CBConfig.dcpFieldThreshold())
.filter(field -> field.exists(atlasClient) || field.documentCount() / (float) collectionEntity.documentsAnalyzed() > CBConfig.dcpFieldThreshold())
// Schedule the entity either for creation or to be updated in Atlas
.forEach(field -> {
if (field.exists(ATLAS)) {
toUpdate.add(field.atlasEntity(ATLAS));
if (field.exists(atlasClient)) {
toUpdate.add(field.atlasEntity(atlasClient));
} else {
toCreate.add(field.atlasEntity(ATLAS));
toCreate.add(field.atlasEntity(atlasClient));
}
});

Expand All @@ -257,14 +203,68 @@ public void onEvent(ChannelFlowController flowController, ByteBuf event) {
}
}

@Override
public String getMessageSource() {
return "couchbase";
}

/**
* Ensures the existence of CouchbaseCluster, CouchbaseBucket and Couchbase scope entities and stores them into local cache
*/
private void initializeAtlasContext() {
LOG.debug("Creating cluster/bucket/scope entities");

clusterEntity = new CouchbaseCluster().name(CBConfig.address()).url(CBConfig.address()).get();
bucketEntity = new CouchbaseBucket().name(CBConfig.bucket()).cluster(clusterEntity).get();

List<AtlasEntity> entitiesToCreate = new ArrayList<>();

if (!clusterEntity.exists(atlasClient)) {
entitiesToCreate.add(clusterEntity.atlasEntity(atlasClient));
}

if (!bucketEntity.exists(atlasClient)) {
entitiesToCreate.add(bucketEntity.atlasEntity(atlasClient));
}

if (!entitiesToCreate.isEmpty()) {
createEntities(entitiesToCreate);
}
}

private void createEntities(List<AtlasEntity> entities) {
if (createInterceptor != null) {
createInterceptor.accept(entities);
return;
}

AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(entities);
EntityCreateRequestV2 request = new EntityCreateRequestV2("couchbase", entity);

notifyEntities(Arrays.asList(request), null);
}

private void updateEntities(List<AtlasEntity> entities) {
if (updateInterceptor != null) {
updateInterceptor.accept(entities);

return;
}

AtlasEntitiesWithExtInfo entity = new AtlasEntitiesWithExtInfo(entities);
EntityUpdateRequestV2 request = new EntityUpdateRequestV2("couchbase", entity);

notifyEntities(Arrays.asList(request), null);
}

/**
* Constructs a {@link CouchbaseField} from field information
*
* @param collectionEntity the {@link CouchbaseCollection} to which the field belongs
* @param path the path to the field inside the collection document excluding the field itself
* @param parent the parent field, if present or null
* @param name the name of the field
* @param value the value for the field from received document
* @param path the path to the field inside the collection document excluding the field itself
* @param parent the parent field, if present or null
* @param name the name of the field
* @param value the value for the field from received document
* @return constructed or loaded from Atlas {@link CouchbaseField}
*/
private static Stream<CouchbaseField> processField(CouchbaseCollection collectionEntity, Collection<String> path, @Nullable CouchbaseField parent, String name, Object value) {
Expand All @@ -277,13 +277,7 @@ private static Stream<CouchbaseField> processField(CouchbaseCollection collectio
fieldPath.add(name);

// constructing the field entity and loading it from cache or Atlas, if previously stored there
CouchbaseField rootField = new CouchbaseField()
.name(name)
.fieldPath(fieldPath.stream().collect(Collectors.joining(".")))
.fieldType(fieldType)
.collection(collectionEntity)
.parentField(parent)
.get();
CouchbaseField rootField = new CouchbaseField().name(name).fieldPath(fieldPath.stream().collect(Collectors.joining("."))).fieldType(fieldType).collection(collectionEntity).parentField(parent).get();

// return value
Stream<CouchbaseField> result = Stream.of(rootField);
Expand All @@ -301,8 +295,7 @@ private static Stream<CouchbaseField> processField(CouchbaseCollection collectio
result,
((Map<String, ?>) value).entrySet().stream()
// recursion
.flatMap(entity -> processField(collectionEntity, fieldPath, rootField, entity.getKey(), entity.getValue()))
);
.flatMap(entity -> processField(collectionEntity, fieldPath, rootField, entity.getKey(), entity.getValue())));
} else {
throw new IllegalArgumentException(String.format("Incorrect value type '%s' for field type 'object': a Map was expected instead.", value.getClass()));
}
Expand All @@ -311,11 +304,6 @@ private static Stream<CouchbaseField> processField(CouchbaseCollection collectio
return result;
}

@Override
public String getMessageSource() {
return "couchbase";
}

/**
* Looks up the collection name by its vbucket identifier
*
Expand All @@ -324,18 +312,6 @@ public String getMessageSource() {
* @return the name of the collection
*/
private static CollectionInfo collectionInfo(int vbucket, long collid) {
return DCP.sessionState()
.get(vbucket)
.getCollectionsManifest()
.getCollection(collid);
}

protected static void setEntityInterceptors(Consumer<List<AtlasEntity>> createInterceptor, Consumer<List<AtlasEntity>> updateInterceptor) {
CouchbaseHook.createInterceptor = createInterceptor;
CouchbaseHook.updateInterceptor = updateInterceptor;
}

static void loop(boolean loop) {
CouchbaseHook.loop = loop;
return dcpClient.sessionState().get(vbucket).getCollectionsManifest().getCollection(collid);
}
}
Loading

0 comments on commit 79325d4

Please sign in to comment.