diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst index e3b60e6701..f948778e63 100644 --- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst +++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst @@ -277,6 +277,116 @@ properties: request-timeout = 20000 # Must be >=1000 (default: 20000) +Secure RPC client - Thrift +'''''''''''''''''''''''''' + +As of Flume 1.6.0, Thrift source and sink supports kerberos based authentication. +The client needs to use the getThriftInstance method of ``SecureRpcClientFactory`` +to get hold of a ``SecureThriftRpcClient``. ``SecureThriftRpcClient`` extends +``ThriftRpcClient`` which implements the ``RpcClient`` interface. The kerberos +authentication module resides in flume-ng-auth module which is +required in classpath, when using the ``SecureRpcClientFactory``. Both the client +principal and the client keytab should be passed in as parameters through the +properties and they reflect the credentials of the client to authenticate +against the kerberos KDC. In addition, the server principal of the destination +Thrift source to which this client is connecting to, should also be provided. +The following example shows how to use the ``SecureRpcClientFactory`` +within a user's data-generating application: + +.. code-block:: java + + import org.apache.flume.Event; + import org.apache.flume.EventDeliveryException; + import org.apache.flume.event.EventBuilder; + import org.apache.flume.api.SecureRpcClientFactory; + import org.apache.flume.api.RpcClientConfigurationConstants; + import org.apache.flume.api.RpcClient; + import java.nio.charset.Charset; + import java.util.Properties; + + public class MyApp { + public static void main(String[] args) { + MySecureRpcClientFacade client = new MySecureRpcClientFacade(); + // Initialize client with the remote Flume agent's host, port + Properties props = new Properties(); + props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift"); + props.setProperty("hosts", "h1"); + props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414)); + + // Initialize client with the kerberos authentication related properties + props.setProperty("kerberos", "true"); + props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG"); + props.setProperty("client-keytab", "/tmp/flumeclient.keytab"); + props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG"); + client.init(props); + + // Send 10 events to the remote Flume agent. That agent should be + // configured to listen with an AvroSource. + String sampleData = "Hello Flume!"; + for (int i = 0; i < 10; i++) { + client.sendDataToFlume(sampleData); + } + + client.cleanUp(); + } + } + + class MySecureRpcClientFacade { + private RpcClient client; + private Properties properties; + + public void init(Properties properties) { + // Setup the RPC connection + this.properties = properties; + // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory + this.client = SecureRpcClientFactory.getThriftInstance(properties); + } + + public void sendDataToFlume(String data) { + // Create a Flume Event object that encapsulates the sample data + Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); + + // Send the event + try { + client.append(event); + } catch (EventDeliveryException e) { + // clean up and recreate the client + client.close(); + client = null; + client = SecureRpcClientFactory.getThriftInstance(properties); + } + } + + public void cleanUp() { + // Close the RPC connection + client.close(); + } + } + +The remote ``ThriftSource`` should be started in kerberos mode. +Below is an example Flume agent configuration that's waiting for a connection +from MyApp: + +.. code-block:: properties + + a1.channels = c1 + a1.sources = r1 + a1.sinks = k1 + + a1.channels.c1.type = memory + + a1.sources.r1.channels = c1 + a1.sources.r1.type = thrift + a1.sources.r1.bind = 0.0.0.0 + a1.sources.r1.port = 41414 + a1.sources.r1.kerberos = true + a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG + a1.sources.r1.agent-keytab = /tmp/flume.keytab + + + a1.sinks.k1.channel = c1 + a1.sinks.k1.type = logger + Failover Client ''''''''''''''' @@ -459,20 +569,29 @@ full Agent. The following is an exhaustive list of configration options: Required properties are in **bold**. -==================== ================ ============================================== -Property Name Default Description -==================== ================ ============================================== -source.type embedded The only available source is the embedded source. -**channel.type** -- Either ``memory`` or ``file`` which correspond to MemoryChannel and FileChannel respectively. -channel.* -- Configuration options for the channel type requested, see MemoryChannel or FileChannel user guide for an exhaustive list. -**sinks** -- List of sink names -**sink.type** -- Property name must match a name in the list of sinks. Value must be ``avro`` -sink.* -- Configuration options for the sink. See AvroSink user guide for an exhaustive list, however note AvroSink requires at least hostname and port. -**processor.type** -- Either ``failover`` or ``load_balance`` which correspond to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. -processor.* -- Configuration options for the sink processor selected. See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list. -source.interceptors -- Space-separated list of interceptors -source.interceptors.* -- Configuration options for individual interceptors specified in the source.interceptors property -==================== ================ ============================================== +===================== ================ ====================================================================== +Property Name Default Description +===================== ================ ====================================================================== +source.type embedded The only available source is the embedded source. +**channel.type** -- Either ``memory`` or ``file`` which correspond + to MemoryChannel and FileChannel respectively. +channel.* -- Configuration options for the channel type requested, + see MemoryChannel or FileChannel user guide for an exhaustive list. +**sinks** -- List of sink names +**sink.type** -- Property name must match a name in the list of sinks. + Value must be ``avro`` +sink.* -- Configuration options for the sink. + See AvroSink user guide for an exhaustive list, + however note AvroSink requires at least hostname and port. +**processor.type** -- Either ``failover`` or ``load_balance`` which correspond + to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively. +processor.* -- Configuration options for the sink processor selected. + See FailoverSinksProcessor and LoadBalancingSinkProcessor + user guide for an exhaustive list. +source.interceptors -- Space-separated list of interceptors +source.interceptors.* -- Configuration options for individual interceptors + specified in the source.interceptors property +===================== ================ ====================================================================== Below is an example of how to use the agent: