forked from apache/logging-flume
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
FLUME-2668. Document SecureThriftRpcClient/SecureRpcClientFactory in …
…Flume Developer Guide (Johny Rufus via Hari)
- Loading branch information
1 parent
be4ae29
commit a508d95
Showing
1 changed file
with
133 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -277,6 +277,116 @@ properties: | |
request-timeout = 20000 # Must be >=1000 (default: 20000) | ||
Secure RPC client - Thrift | ||
'''''''''''''''''''''''''' | ||
|
||
As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication. | ||
The client needs to use the getThriftInstance method of ``SecureRpcClientFactory`` | ||
to get hold of a ``SecureThriftRpcClient``. ``SecureThriftRpcClient`` extends | ||
``ThriftRpcClient`` which implements the ``RpcClient`` interface. The kerberos | ||
authentication module resides in flume-ng-auth module which is | ||
required in classpath, when using the ``SecureRpcClientFactory``. Both the client | ||
principal and the client keytab should be passed in as parameters through the | ||
properties and they reflect the credentials of the client to authenticate | ||
against the kerberos KDC. In addition, the server principal of the destination | ||
Thrift source to which this client is connecting to, should also be provided. | ||
The following example shows how to use the ``SecureRpcClientFactory`` | ||
within a user's data-generating application: | ||
|
||
.. code-block:: java | ||
import org.apache.flume.Event; | ||
import org.apache.flume.EventDeliveryException; | ||
import org.apache.flume.event.EventBuilder; | ||
import org.apache.flume.api.SecureRpcClientFactory; | ||
import org.apache.flume.api.RpcClientConfigurationConstants; | ||
import org.apache.flume.api.RpcClient; | ||
import java.nio.charset.Charset; | ||
import java.util.Properties; | ||
public class MyApp { | ||
public static void main(String[] args) { | ||
MySecureRpcClientFacade client = new MySecureRpcClientFacade(); | ||
// Initialize client with the remote Flume agent's host, port | ||
Properties props = new Properties(); | ||
props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift"); | ||
props.setProperty("hosts", "h1"); | ||
props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414)); | ||
// Initialize client with the kerberos authentication related properties | ||
props.setProperty("kerberos", "true"); | ||
props.setProperty("client-principal", "flumeclient/[email protected]"); | ||
props.setProperty("client-keytab", "/tmp/flumeclient.keytab"); | ||
props.setProperty("server-principal", "flume/[email protected]"); | ||
client.init(props); | ||
// Send 10 events to the remote Flume agent. That agent should be | ||
// configured to listen with an AvroSource. | ||
String sampleData = "Hello Flume!"; | ||
for (int i = 0; i < 10; i++) { | ||
client.sendDataToFlume(sampleData); | ||
} | ||
client.cleanUp(); | ||
} | ||
} | ||
class MySecureRpcClientFacade { | ||
private RpcClient client; | ||
private Properties properties; | ||
public void init(Properties properties) { | ||
// Setup the RPC connection | ||
this.properties = properties; | ||
// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory | ||
this.client = SecureRpcClientFactory.getThriftInstance(properties); | ||
} | ||
public void sendDataToFlume(String data) { | ||
// Create a Flume Event object that encapsulates the sample data | ||
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); | ||
// Send the event | ||
try { | ||
client.append(event); | ||
} catch (EventDeliveryException e) { | ||
// clean up and recreate the client | ||
client.close(); | ||
client = null; | ||
client = SecureRpcClientFactory.getThriftInstance(properties); | ||
} | ||
} | ||
public void cleanUp() { | ||
// Close the RPC connection | ||
client.close(); | ||
} | ||
} | ||
The remote ``ThriftSource`` should be started in kerberos mode. | ||
Below is an example Flume agent configuration that's waiting for a connection | ||
from MyApp: | ||
|
||
.. code-block:: properties | ||
a1.channels = c1 | ||
a1.sources = r1 | ||
a1.sinks = k1 | ||
a1.channels.c1.type = memory | ||
a1.sources.r1.channels = c1 | ||
a1.sources.r1.type = thrift | ||
a1.sources.r1.bind = 0.0.0.0 | ||
a1.sources.r1.port = 41414 | ||
a1.sources.r1.kerberos = true | ||
a1.sources.r1.agent-principal = flume/[email protected] | ||
a1.sources.r1.agent-keytab = /tmp/flume.keytab | ||
a1.sinks.k1.channel = c1 | ||
a1.sinks.k1.type = logger | ||
Failover Client | ||
''''''''''''''' | ||
|
||
|
@@ -459,20 +569,29 @@ full Agent. The following is an exhaustive list of configration options: | |
|
||
Required properties are in **bold**. | ||
|
||
==================== ================ ============================================== | ||
Property Name Default Description | ||
==================== ================ ============================================== | ||
source.type embedded The only available source is the embedded source. | ||
**channel.type** -- Either ``memory`` or ``file`` which correspond to MemoryChannel and FileChannel respectively. | ||
channel.* -- Configuration options for the channel type requested, see MemoryChannel or FileChannel user guide for an exhaustive list. | ||
**sinks** -- List of sink names | ||
**sink.type** -- Property name must match a name in the list of sinks. Value must be ``avro`` | ||
sink.* -- Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port. | ||
**processor.type** -- Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. | ||
processor.* -- Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list. | ||
source.interceptors -- Space-separated list of interceptors | ||
source.interceptors.* -- Configuration options for individual interceptors specified in the source.interceptors property | ||
==================== ================ ============================================== | ||
===================== ================ ====================================================================== | ||
Property Name Default Description | ||
===================== ================ ====================================================================== | ||
source.type embedded The only available source is the embedded source. | ||
**channel.type** -- Either ``memory`` or ``file`` which correspond | ||
to MemoryChannel and FileChannel respectively. | ||
channel.* -- Configuration options for the channel type requested, | ||
see MemoryChannel or FileChannel user guide for an exhaustive list. | ||
**sinks** -- List of sink names | ||
**sink.type** -- Property name must match a name in the list of sinks. | ||
Value must be ``avro`` | ||
sink.* -- Configuration options for the sink. | ||
See AvroSink user guide for an exhaustive list, | ||
however note AvroSink requires at least hostname and port. | ||
**processor.type** -- Either ``failover`` or ``load_balance`` which correspond | ||
to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. | ||
processor.* -- Configuration options for the sink processor selected. | ||
See FailoverSinksProcessor and LoadBalancingSinkProcessor | ||
user guide for an exhaustive list. | ||
source.interceptors -- Space-separated list of interceptors | ||
source.interceptors.* -- Configuration options for individual interceptors | ||
specified in the source.interceptors property | ||
===================== ================ ====================================================================== | ||
|
||
Below is an example of how to use the agent: | ||
|
||
|