Different types of failures in Hadoop

Karthik Sharma
6 min readJun 15, 2021

--

One of the major advantage of using Hadoop is its ability to handle failures and allow jobs to complete successfully. In this article we are going to discuss about the different types of Failures that can occur in Hadoop and how they are handled.

Let us begin with the HDFS failure and then discuss about the YARN failures. In HDFS there are two main daemons, Namenode and Datanode.

Namenode Failure:

Namenode is the master node which stores metadata like filename, number of blocks, number of replicas, location of blocks and block IDs.

In Hadoop 1x, Namenode is the single point of failure. Even if the metadata is persisted in the disk, the time taken to recover it is very high (i.e. 30 minutes approximately ). The addition of High Availability in Hadoop 2 solves this problem.

In Hadoop 2 there will be two namenodes with active and passive configuration. At a given point of time only one namenode will be active and if the active namenode goes down then the passive namenode will take the responsibility of serving clients without any interruption.

The active and passive node should always be in sync with each other and must have same metadata(i.e. fsimage and editlogs). The namenodes must use highly available shared storage to share the edit log. When a standby namenode comes up, it reads up to the end of the shared edit log to synchronize its state with the active namenode, and then continues to read new entries as they are written by the active namenode.

Datanodes must send block reports to both namenodes because the block mappings are stored in a namenode’s memory, and not on disk. The passive node takes periodic checkpoints of the active namenode’s namespace.

One more important point to know here is the Fencing. Fencing is a process to ensure that only one namenode is active at a time. Namenode Failover fencing can be done using two ways, One way is by using Quorum Journal Manager (QJM) for storing the editlogs, where only the active namenode can write the editlogs to the Journal Node and passive or standby node can only read the editlogs. Other way is using the shared storage where the active node applies the edit logs information and passive node will constantly look for the changes. When a namenode fails it is killed from accessing this shared storage device. This way we ensure that only one namenode is active.

DataNode Failure:

In HDFS, to prevent the failure or loss of data we use replication, where a single data block is stored in multiple locations. By default the replication factor for HDFS is 3, which means that along with the original block there will be two replica’s.

Hadoop’s default strategy is to place the first replica on the same node as the client (for clients running outside the cluster, a node is chosen at random, although the system tries not to pick nodes that are too full or too busy). The second replica is placed on a different rack from the first (off-rack), chosen at random. The third replica is placed on the same rack as the second, but on a different node chosen at random. Further replicas are placed on random nodes in the cluster, although the system tries to avoid placing too many replicas on the same rack.

Datanode will continuously sends heartbeat signals to namenode for every 3 seconds by default. If Namenode does not receive a heartbeat from datanode for 10 minutes (by default), the Namenode will consider the datanode to be dead. Namenode will now check the data available in that dead datanode and initiates the data replication. So that the replication strategy discussed above will be satisfied. Even if the one datanode goes down, the namenode will provide the address for its replica, so that there wont be any interruption.

Now let us see the failure in YARN, Hadoop handles the failures of Application Master, Resource Manager, Node Manager and Tasks.

Application Master Failure:

If an Application Master fails in Hadoop, then all the information related to that particular job execution will be lost. By default the maximum number of attempts to run an application master is 2, so if an application master fails twice it will not be tried again and the job will fail. This can be controlled using the property yarn.resourcemanager.am.max-attempts.

An application master sends periodic heartbeats to the RM, and in the event of application master failure, the RM will detect the failure and starts a new instance of the master running in a new container, it will use the job history to recover the state of the tasks that were already run by the (failed) application so they don’t have to be rerun. Recovery is enabled by default, but can be disabled by setting yarn.app.mapreduce.am.job.recovery.enable to false.

The MapReduce client polls the application master for progress reports, but if its application master fails, the client needs to locate the new instance. During job initialization, the client asks the resource manager for the application master’s address, and then caches it so it doesn’t overload the resource manager with a request every time it needs to poll the application master. If the application master fails, however, the client will experience a timeout when it issues a status update, at which point the client will go back to the resource manager to ask for the new application master’s address.

Node Manager Failure:

Nodemanager will send a heartbeat signal for every 3 seconds to the resource manager. If the node manager fails due to crash or running very slowly the RM will wait for the heartbeat for 10 minutes. If it is not received, the RM will remove the node from its pool to schedule the containers.

If an AM is running in the failed node manager, then the AM will be launched in another node and this will not be considered as an attempt because it was not the mistake of the AM (it will not be counted in am max attempt counter). The completed tasks of the dead node manager are to be rerun if they belong to incomplete jobs, since their intermediate output residing on the failed node manager’s local filesystem may not be accessible to the reduce task.

Node managers may be blacklisted if the number of failures for the application is high, even if the node manager itself has not failed. Blacklisting is done by the application master if more than three tasks fail on a node manager.

Resource Manager:

Resource Manager is the single point of failure in YARN. To achieve high availability (HA), it is necessary to run a pair of resource managers in an active-standby configuration. If the active resource manager fails, then the standby can take over without a significant interruption to the client.

Information about all the running applications is stored in a highly available state store (backed by ZooKeeper or HDFS), so that the standby can recover the core state of the failed active resource manager. Node manager information is not stored in the state store since it can be reconstructed relatively quickly by the new resource manager as the node managers send their first heartbeats.

The transition of a resource manager from standby to active is handled by a failover controller. The default failover controller is an automatic one, which uses ZooKeeper leader election to ensure that there is only a single active resource manager at one time.

Task Failure:

Task failure generally occurs due to run time exceptions or due to sudden exit of task JVM. The task will send a heartbeat signal to the AM for every 3 seconds and if the AM doesn’t receive any update for 10 minutes, it will consider the task as failed and will rerun the task attempt.

When the application master is notified of a task attempt that has failed, it will reschedule execution of the task. The application master will try to avoid rescheduling the task on a node manager where it has previously failed. Furthermore, if a task fails four times, it will not be retried again. The job will return the failed status.

Some task attempt may also be killed, which is different from it failing. A task attempt may be killed because it is a speculative duplicate or because the node manager it was running on failed and the application master marked all the task attempts running on it as killed. Killed task attempts do not count against the number of attempts to run the task, because it wasn’t the task’s fault that an attempt was killed.

These are the different types of failures in Hadoop and how they are handled automatically. Hope you like this article. Happy Learning!!!

--

--

No responses yet