class Kubeclient::Config

Kubernetes client configuration class

Public Class Methods

new(data, kcfg_path) click to toggle source

data (Hash) - Parsed kubeconfig data. kcfg_path (string) - Base directory for resolving relative references to external files.

If set to nil, all external lookups & commands are disabled (even for absolute paths).

See also the more convenient Config.read

# File lib/kubeclient/config.rb, line 25
def initialize(data, kcfg_path)
  @kcfg = data
  @kcfg_path = kcfg_path
  raise 'Unknown kubeconfig version' if @kcfg['apiVersion'] != 'v1'
end
read(filename) click to toggle source

Builds Config instance by parsing given file, with lookups relative to file's directory.

# File lib/kubeclient/config.rb, line 32
def self.read(filename)
  parsed =
    if RUBY_VERSION >= '2.6'
      YAML.safe_load(File.read(filename), permitted_classes: [Date, Time])
    else
      YAML.safe_load(File.read(filename), [Date, Time])
    end
  Config.new(parsed, File.dirname(filename))
end

Public Instance Methods

context(context_name = nil) click to toggle source
# File lib/kubeclient/config.rb, line 46
def context(context_name = nil)
  cluster, user, namespace = fetch_context(context_name || @kcfg['current-context'])

  if user.key?('exec')
    exec_opts = expand_command_option(user['exec'], 'command')
    user['exec_result'] = ExecCredentials.run(exec_opts)
  end

  client_cert_data = fetch_user_cert_data(user)
  client_key_data  = fetch_user_key_data(user)
  auth_options     = fetch_user_auth_options(user)

  ssl_options = {}

  ssl_options[:verify_ssl] = if cluster['insecure-skip-tls-verify'] == true
                               OpenSSL::SSL::VERIFY_NONE
                             else
                               OpenSSL::SSL::VERIFY_PEER
                             end

  if cluster_ca_data?(cluster)
    cert_store = OpenSSL::X509::Store.new
    populate_cert_store_from_cluster_ca_data(cluster, cert_store)
    ssl_options[:cert_store] = cert_store
  end

  unless client_cert_data.nil?
    ssl_options[:client_cert] = OpenSSL::X509::Certificate.new(client_cert_data)
  end

  unless client_key_data.nil?
    ssl_options[:client_key] = OpenSSL::PKey.read(client_key_data)
  end

  Context.new(cluster['server'], @kcfg['apiVersion'], ssl_options, auth_options, namespace)
end
contexts() click to toggle source
# File lib/kubeclient/config.rb, line 42
def contexts
  @kcfg['contexts'].map { |x| x['name'] }
end

Private Instance Methods

allow_external_lookups?() click to toggle source
# File lib/kubeclient/config.rb, line 85
def allow_external_lookups?
  @kcfg_path != nil
end
cluster_ca_data?(cluster) click to toggle source
# File lib/kubeclient/config.rb, line 136
def cluster_ca_data?(cluster)
  cluster.key?('certificate-authority') || cluster.key?('certificate-authority-data')
end
expand_command_option(config, key) click to toggle source
# File lib/kubeclient/config.rb, line 195
def expand_command_option(config, key)
  config = config.dup
  config[key] = ext_command_path(config[key]) if config[key]

  config
end
ext_command_path(path) click to toggle source
# File lib/kubeclient/config.rb, line 96
def ext_command_path(path)
  unless allow_external_lookups?
    raise "Kubeclient::Config: external lookups disabled, can't execute '#{path}'"
  end
  # Like go client https://github.com/kubernetes/kubernetes/pull/59495#discussion_r171138995,
  # distinguish 3 cases:
  # - absolute (e.g. /path/to/foo)
  # - $PATH-based (e.g. curl)
  # - relative to config file's dir (e.g. ./foo)
  if Pathname(path).absolute?
    path
  elsif File.basename(path) == path
    path
  else
    File.join(@kcfg_path, path)
  end
end
ext_file_path(path) click to toggle source
# File lib/kubeclient/config.rb, line 89
def ext_file_path(path)
  unless allow_external_lookups?
    raise "Kubeclient::Config: external lookups disabled, can't load '#{path}'"
  end
  Pathname(path).absolute? ? path : File.join(@kcfg_path, path)
end
fetch_context(context_name) click to toggle source
# File lib/kubeclient/config.rb, line 114
def fetch_context(context_name)
  context = @kcfg['contexts'].detect do |x|
    break x['context'] if x['name'] == context_name
  end

  raise KeyError, "Unknown context #{context_name}" unless context

  cluster = @kcfg['clusters'].detect do |x|
    break x['cluster'] if x['name'] == context['cluster']
  end

  raise KeyError, "Unknown cluster #{context['cluster']}" unless cluster

  user = @kcfg['users'].detect do |x|
    break x['user'] if x['name'] == context['user']
  end || {}

  namespace = context['namespace']

  [cluster, user, namespace]
end
fetch_token_from_provider(auth_provider) click to toggle source
# File lib/kubeclient/config.rb, line 185
def fetch_token_from_provider(auth_provider)
  case auth_provider['name']
  when 'gcp'
    config = expand_command_option(auth_provider['config'], 'cmd-path')
    Kubeclient::GCPAuthProvider.token(config)
  when 'oidc'
    Kubeclient::OIDCAuthProvider.token(auth_provider['config'])
  end
end
fetch_user_auth_options(user) click to toggle source
# File lib/kubeclient/config.rb, line 169
def fetch_user_auth_options(user)
  options = {}
  if user.key?('token')
    options[:bearer_token] = user['token']
  elsif user.key?('exec_result') && user['exec_result'].key?('token')
    options[:bearer_token] = user['exec_result']['token']
  elsif user.key?('auth-provider')
    options[:bearer_token] = fetch_token_from_provider(user['auth-provider'])
  else
    %w[username password].each do |attr|
      options[attr.to_sym] = user[attr] if user.key?(attr)
    end
  end
  options
end
fetch_user_cert_data(user) click to toggle source
# File lib/kubeclient/config.rb, line 149
def fetch_user_cert_data(user)
  if user.key?('client-certificate')
    File.read(ext_file_path(user['client-certificate']))
  elsif user.key?('client-certificate-data')
    Base64.decode64(user['client-certificate-data'])
  elsif user.key?('exec_result') && user['exec_result'].key?('clientCertificateData')
    user['exec_result']['clientCertificateData']
  end
end
fetch_user_key_data(user) click to toggle source
# File lib/kubeclient/config.rb, line 159
def fetch_user_key_data(user)
  if user.key?('client-key')
    File.read(ext_file_path(user['client-key']))
  elsif user.key?('client-key-data')
    Base64.decode64(user['client-key-data'])
  elsif user.key?('exec_result') && user['exec_result'].key?('clientKeyData')
    user['exec_result']['clientKeyData']
  end
end
populate_cert_store_from_cluster_ca_data(cluster, cert_store) click to toggle source
# File lib/kubeclient/config.rb, line 140
def populate_cert_store_from_cluster_ca_data(cluster, cert_store)
  if cluster.key?('certificate-authority')
    cert_store.add_file(ext_file_path(cluster['certificate-authority']))
  elsif cluster.key?('certificate-authority-data')
    ca_cert_data = Base64.decode64(cluster['certificate-authority-data'])
    cert_store.add_cert(OpenSSL::X509::Certificate.new(ca_cert_data))
  end
end